1. 概述

本文档的目的是帮助开发人员熟悉Spring Cloud开发。了解Spring Cloud各个组件的基本功能和使用。明确一些常见场景的开发规范。

1.1. 整体架构

微服务整体架构
Figure 1. 微服务整体架构

目前整个架构规划由11个大的组件组成,分别负责微服务中某一方面的特性支撑,后续随着业务发展,可能会进一步加入新的组件。

  1. 服务网关:Spring Zuul。微服务集群的流量入口,主要职责分为两部分:一是对服务应用有感知且重要的功能,例如认证鉴权。二是对服务应用无感知的边缘服务,例如流控、熔断、降级、监控等。

  2. 服务注册和发现:Netflix Eureka。微服务治理的核心。

  3. 配置中心:携程 Apollo。集中化管理应用不同环境、不同集群的配置,并在配置修改后实时推送到应用端。

  4. 认证和授权:基于Spring Security OAuth自研。负责token的管理,如生成,存储和分发等,以及访问权限管理。

  5. 限流熔断和流聚合:Netflix Hystrix。保证微服务高可用,防止微服务分布式系统中的雪崩效应。和Turbine配合监控微服务集群的健康情况。

  6. 微服务开发框架:Spring Boot。没啥可说的,java微服务开发首选。

  7. 数据总线:RabbitMQ Kafka。微服务集群产生的所有数据,包括日志,各种实时的指标数据等,通过Kafka做收集、存储和转发。相当于在微服务集群内部以及内部和外围应用间增加了一个大容量缓冲,能够应对海量数据的场景。RabbitMQ主要用作服务间消息通讯。

  8. 调用链监控:zipkin或者大众点评的cat,zipkin代码无侵入但是功能简单,cat功能强但是有代码侵入。

  9. 指标监控:turbine,服务的指标监控,如吞吐量,TP90,TP99等。

  10. 日志监控:ELK

  11. 服务器健康检查和告警:阿里云自带的监控。

1.2. 项目代码

本文档以一个虚拟的电商网站为案例来演示开发,包括商品,用户,购物车,订单等模块。样例中不会实现完整的业务逻辑,只是在一个业务上下文中用简化代码来演示Spring Cloud相关的特性。

feature分支与文档的索引编号对应,例如:feature/2.2分支的代码,对应文档2.2章节完成后代码的样子。

2. 微服务开发 Spring Boot

2.1. 微服务设计

一个设计良好的微服务应该具有以下特征:

  1. 聚焦.微服务应该都有明确的职责。每个微服务只做一件事,并且把这个事做好。

  2. 松耦合.微服务之间应该通过通用的,语言无关的接口互相调用,如(Http和REST)。在接口声明不变的前提下,每个微服务可以独立的修改和优化。

  3. 数据抽象.微服务应该拥有完全属于自己的数据结构和数据源。属于某个微服务的数据,应该只能通过访问这个微服务来修改。微服务不能直接访问其它微服务的数据源。

  4. 独立.微服务应该可以独立的开发、编译、测试和部署。

为了达到这些目标,在动手开发微服务应用之前,我们应该考虑3方面的问题。

  1. 业务领域如何拆解

  2. 服务粒度如何确定

  3. 服务接口如何定义

这些问题的答案并没有绝对的对错之分,而且往往会随着业务发展变化而变化。但是仍然有一些可供参考的指导方针,帮助我们确定我们的设计。

  • 关于拆解业务领域

    • 描述业务问题或逻辑,注意描述中的名词。在描述一个业务问题时,同一个名词反复出现,通常意味着存在一个核心的业务领域。同时也意味着应该围绕这个业务领域来构建微服务。

    • 关注动词,动词揭示领域模型的行为及轮廓。这在建模时至关重要。

    • 当名词标识出不同的业务域后,应该寻找这些领域中那些高度相关的数据块,并建立聚合。记住,微服务应该拥有完全属于自己的数据结构和数据源。

  • 关于服务粒度

    • 从较大粒度的服务开始,然后通过重构来拆解更小粒度的服务。在缺乏对业务的深刻理解的情况下,服务粒度过细会导致复杂度快速上升。

    • 优先关注你的服务将如何与其它服务交互。真正站在服务调用者的立场上去思考你的服务粒度。

    • 随着你对业务域理解的深入来改变服务的职责。一个粗粒度的服务会随着开发人员对业务的理解而逐渐拆解成细粒度的服务。在这个过程中,粗粒度服务的职责从执行业务逻辑逐渐变为对新的细粒度服务的编排和封装。

  • 服务的坏味道

    • 当一个服务粒度过粗时,你可能会看到下面这些现象:

      • 服务担负了太多职责:一个服务的业务逻辑流程非常复杂,并且在流程上每个环节都有一组要遵循的业务规则。

      • 服务管理的数据跨越了多张表:如果你的服务需要持久化数据到太多张表,这往往意味着你的服务粒度过粗。一般一个微服务不应该操作超过3-5张表。

      • 大量的测试用例:如果一个服务需要用几十甚至上百个单元测试和集成测试用例才能完全覆盖,说明你应该将其重构为较细粒度的服务。

    • 当一个服务粒度过细时,你可能会看到下面这些现象:

      • 基于服务来组装业务逻辑变得复杂而困难。往往需要调用大量的细碎的小服务,并且写很多粘合代码,才能实现业务逻辑。

      • 服务和服务之间重度依赖。为了完成一个请求,几个服务之间反复调来调去。

      • 所有服务都是简单的CRUD。服务是为了表现业务逻辑而存在的,不是为了抽象你的数据层操作。

  • 关于服务接口定义

    • 拥抱REST! 拥抱REST!! 拥抱REST!!!

    • 用URI描述资源,用Method描述行为。

    • 用JSON作为请求和响应

    • 用HTTP状态码表示结果。HTTP协议有标准的状态码来表示服务成功还是失败,学习这些状态码并在所有的微服务中统一使用。不要自己定义状态码,除非是要开发开放平台对第三方提供服务。

2.2. 微服务开发

在我们的例子中,我们要开发一个商品应用,提供一个商品信息服务,根据商品id返回商品信息。

一个简化后的商品服务数据模型如下图:

商品服务数据模型
Figure 2. 商品服务数据模型

真实的数据模型可能会包含10几张甚至更多的表,在一个微服务应用中处理这个量级的数据结构并不是一个好的实践,因为类目,品牌等业务域,都有可能会有独立的业务需求变化,如果因为类目管理的需求变化而更新整个商品应用,显然不符合微服务的设计目标。

当然,在我们的例子中并不会实现整个商品服务的业务逻辑,我们只是指出在微服务开发中需要考虑的一些场景。下面是对业务模型聚合后划分的业务领域,以及依据业务领域建立的项目结构。

业务域划分
Figure 3. 业务域划分

将商品应用划分为3个域,主要是从业务上考虑,就像前面说的,类目,品牌等业务域都有可能会有独立的业务需求变化。而具体的业务域所覆盖的资源,则是根据数据的聚合情况来决定的,比如属性的维护,都是依托类目来进行的,在业务上,我们会根据类目去定义商品属性,不会根据品牌去定义商品属性,也就是说,属性与类目是紧密聚合的,所以属性被划在类目域来管理。

这只是一个简化的例子,真实世界中的情况比这个例子要复杂的多,要考虑的因素也会更多。
项目结构
Figure 4. 项目结构
  1. commodity: POM项目,负责整个项目公用属性的配置,如依赖管理,版本属性等。

  2. commodity-app: 聚合项目,聚合业务模块并整体打包,没有业务逻辑。

  3. commodity-brand: 品牌服务模块

  4. commodity-catalog: 类目服务模块

  5. commodity-product: 商品服务模块

建立多模块项目主要有两点好处:

  • 更明确的依赖关系: 当所有代码都在一个项目里时,开发人员对模块间的依赖并不在意。各个类之间可以随意引用。而从物理上建立模块项目后,如果commodity-product项目要使用commodity-catalog项目的代码,需要明确的在commodity-product项目的pom里添加对commodity-catalog的依赖,这有利于架构师了解并控制模块间的依赖关系。

  • 方便以后拆分: 如果各个模块间是松耦合的,那么将来如果要将类目服务单独部署,只需要将commodity-catalog项目独立打包部署即可。而commodity-app项目只要在pom里去掉对commodity-catalog的依赖,就可以打出不包含类目服务的商品应用。

商品信息服务代码如下:

ProductController.java
@RestController
@RequestMapping("/product")
public class ProductController {

	/**
	 * 根据id获取商品信息
	 *
	 * @author zhailiang@zkh360.com
	 * 2018年6月1日
	 * @param id 商品id
	 * @return
	 */
	@GetMapping("/{id}")
	public ProductInfo getInfo(@PathVariable Long id) {
		ProductInfo info = new ProductInfo();
		info.setId(1L);
		info.setName("product1");
		info.setBrand("brand1");
		info.setCatalog("catalog1");
		return info;
	}

}
CommodityApplication.java
@SpringBootApplication
public class CommodityApplication {

	/**
	 *
	 * @author zhailiang@zkh360.com
	 * 2018年6月1日
	 * @param args
	 */
	public static void main(String[] args) {
		SpringApplication.run(CommodityApplication.class, args);
	}

}

访问结果如下:

第一个服务
Figure 5. 第一个服务

2.3. 微服务文档

增加swagger依赖.

pom.xml
<dependency>
	<groupId>io.springfox</groupId>
	<artifactId>springfox-swagger2</artifactId>
	<version>2.7.0</version>
</dependency>
<dependency>
	<groupId>io.springfox</groupId>
	<artifactId>springfox-swagger-ui</artifactId>
	<version>2.7.0</version>
</dependency>

激活及配置自动生成的文档信息

CommodityApplication.java
@SpringBootApplication
@EnableSwagger2
public class CommodityApplication {

	/**
	 *
	 * @author zhailiang@zkh360.com
	 * 2018年6月1日
	 * @param args
	 */
	public static void main(String[] args) {
		SpringApplication.run(CommodityApplication.class, args);
	}

	@Bean
	public Docket api() {
		return new Docket(DocumentationType.SWAGGER_2).apiInfo(apiInfo()).select()
				.apis(RequestHandlerSelectors.basePackage("com.zkh360"))
				.paths(PathSelectors.any()).build();
	}

	private ApiInfo apiInfo() {
		return new ApiInfoBuilder().title("商品中心接口文档").description("RESTful 风格接口")
				.version("1.0")
				.build();
	}
}

为服务添加文档信息

ProductController.java
@Api(tags = "商品服务") (1)
@RestController
@RequestMapping("/products")
public class ProductController {

	/**
	 * 根据id获取商品信息
	 *
	 * @author zhailiang@zkh360.com
	 * 2018年6月1日
	 * @param id 商品id
	 * @return
	 */
	@ApiOperation("根据id获取商品信息") (2)
	@ApiImplicitParam(name="id",value="商品ID",required=true, dataType="String",paramType="path") (3)
	@GetMapping("/{id}")
	public ProductInfo getInfo(@PathVariable Long id) {
		ProductInfo info = new ProductInfo();
		info.setId(1L);
		info.setName("product1");
		info.setBrand("brand1");
		info.setCatalog("catalog1");
		return info;
	}

}
1 服务类的说明
2 服务方法的说明
3 服务参数说明,这里是针对path上的参数做说明,也可以直接样@ApiParam对方法参数做说明
ProductInfo.java
@Data
public class ProductInfo {

	/**
	 * id
	 */
	@ApiModelProperty("id") (1)
	private Long id;
	/**
	 * 名称
	 */
	@ApiModelProperty("名称") (1)
	private String name;
	/**
	 * 类目
	 */
	@ApiModelProperty("类目") (1)
	private String catalog;
	/**
	 * 品牌
	 */
	@ApiModelProperty("品牌") (1)
	private String brand;

}
1 针对对象内的属性做说明。
服务文档
Figure 6. 服务文档
服务文档
Figure 7. 服务文档

可以直接填入参数发起请求并查看结果

服务文档
Figure 8. 服务文档

3. 服务发现和调用

我们已经有了商品信息服务。现在假设用户看到商品信息以后,点击“加入购物车”按钮。这时页面会拿着商品id去调用购物车服务。购物车服务收到商品id以后,应该调用商品信息服务,并根据商品信息构建购物车项。那么,购物车服务应该如何调用商品信息服务呢?

3.1. 服务在哪里?

最简单的方式。点对点直接调。这个不讨论了,没人会这么做。

在没有服务发现的概念之前,我们一般会在服务之间加一层负载均衡,如下图所示:

传统的服务位置解决方案
Figure 9. 传统的服务位置解决方案:DNS+LoadBalance

在微服务环境下,这种方案存在以下问题:

  1. 单点失败: 负载均衡不是高可用时,一旦负载均衡不可用,则整个服务网络完全瘫痪。把负载均衡弄成高可用,它也会成为整个架构的阻塞点,因为所有服务调用都会经过这里。

  2. 静态管理: 大部分负载均衡都需要在服务节点变化后重新配置。难以快速的横向扩展,同时难以容错。

  3. 增加复杂性: 在服务提供者和服务调用者之间增加了一层转换层。

基于服务发现的架构可以避免以上问题,如下图所示

基于服务发现的架构
Figure 10. 基于服务发现的架构
基于服务发现的服务调用
Figure 11. 基于服务发现的服务调用

3.2. 基于Spring & Eureka实现服务发现

创建新的maven项目,server-eureka.

pom中添加依赖:

pom.xml
<dependency>
	<groupId>org.springframework.cloud</groupId>
	<artifactId>spring-cloud-starter-eureka-server</artifactId>
</dependency>

添加配置文件application.yml

application.yml
server:
  port: 8761
eureka:
  client:
    registerWithEureka: false (1)
    fetchRegistry: false (2)
1 是否向Eureka注册,否,因为我们本身就是服务器。
2 是否获取服务注册信息,否,原因同上。

启动类

EurekaServer.java
@SpringBootApplication
@EnableEurekaServer
public class EurekaServer {

	public static void main(String[] args) {
		SpringApplication.run(EurekaServer.class, args);
	}

}

启动后访问 http://localhost:8761/ 看到以下页面表示服务注册中心启动成功。

eureka
Figure 12. eureka

3.3. 向Eureka注册服务

在commodity-app项目的pom.xml中添加依赖

pom.xml
<dependency>
	<groupId>org.springframework.cloud</groupId>
	<artifactId>spring-cloud-starter-eureka</artifactId>
</dependency>

添加配置文件application.yml

application.yml
spring:
  application:
    name: commodity-service (1)

eureka:
  instance:
    preferIpAddress: true (2)
  client:
    registerWithEureka: true (3)
    fetchRegistry: true (4)
    serviceUrl:
      defaultZone: http://127.0.0.1:8761/eureka/ (5)

server:
  port: 8068
1 应用名,服务调用者将通过这个名字来调用服务。
2 使用IP地址(而不是hostname)向eureka注册
3 将项目中的REST服务(@RestController里声明的方法)向eureka注册。
4 获取eureka上的服务注册信息,因为调用都是基于本地的服务信息缓存发起的。所以如果你不需要调别的服务,可以不获取。
5 eureka服务器的地址。

启动服务后在eureka的主页上可以看到如下信息:

eureka
Figure 13. eureka

这说明我们的commodity-app应用已经注册到eureka.

当服务注册到eureka以后,在其可以被调用之前,eureka会等待3次成功的心跳,每次心跳的间隔是10秒。也就是说,服务提供者正常启动以后30秒eureka才会通知调用者服务可用了。同样,当服务实例下线时,eureka也会在3次心跳失败后才会将服务实例从服务列表中剔除。这是Spring Cloud故意设计的,为了防止网络抖动等原因造成服务实例被错误下线。在微服务环境中,服务不可用是必然发生的情况,我们在设计和开发时应该考虑的是如何在服务不可用时熔断和降级,而不是想法保证所有的调用都成功,因为那是不可能的。

访问 http://localhost:8761/eureka/apps 可以看到目前所有注册到erueka的服务及其实例信息,如下图:

eureka
Figure 14. eureka

OK,现在服务已经注册到eureka,下面我们来看一下服务消费者如何调用服务。

3.4. 基于服务发现调用服务

创建shopcart-app应用,并在pom.xml中添加依赖。

pom.xml
<dependency>
	<groupId>org.springframework.cloud</groupId>
	<artifactId>spring-cloud-starter-eureka</artifactId>
</dependency>

添加配置文件application.yml

application.yml
spring:
  application:
    name: shopcart-service

eureka:
  instance:
    preferIpAddress: true
  client:
    registerWithEureka: true
    fetchRegistry: true
    serviceUrl:
      defaultZone: http://127.0.0.1:8761/eureka/

server:
  port: 8066

添加启动类

ShopcartApplication.java
@SpringBootApplication
@EnableEurekaClient
public class ShopcartApplication {

	/**
	 *
	 * @author zhailiang@zkh360.com
	 * 2018年6月1日
	 * @param args
	 */
	public static void main(String[] args) {
		SpringApplication.run(ShopcartApplication.class, args);
	}

}

3.4.1. 使用RestTemplate调用

ShopcartBeanConfig.java
@Configuration
public class ShopcartBeanConfig {

	@LoadBalanced (1)
	@Bean
	public RestTemplate restTemplate() {
		return new RestTemplate();
	}

}
1 这个注解告诉SpringCloud创造一个带负载均衡的RestTemplate
ShopcartController.java
@Slf4j
@RestController
@RequestMapping("/shopcart/item")
public class ShopcartController {

	@Autowired
	private RestTemplate restTemplate;

	/**
	 * 将商品加入购物车,创建购物车项
	 *
	 * @author zhailiang@zkh360.com 2018年6月1日
	 * @param id
	 *            商品id
	 * @return
	 */
	@PostMapping
	public void create(Long productId) {

		String url = "http://commodity-service/products/" + productId; (1)
		ProductInfo info = restTemplate.getForObject(url, ProductInfo.class); (2)

		log.info(info.getName());
	}

}
1 商品信息服务地址,注意这里不是用的商品信息服务的ip和端口,而是商品信息应用的应用名commodity-service
2 使用之前声明的带负载均衡的RestTemplate来调用服务。如果商品信息服务有多个服务实例,restTemplate会自动进行负载均衡
这种客户端负载均衡,实际上是通过ribbon项目实现的,可以通过配置文件来指定负载均衡策略,超时时间等ribbon的属性。
由于在上一节我们把shopcart-app也注册到了eureka,所以实际上,商品服务也可以用同样的方式来调用购物车服务。

3.4.2. 使用FeignClient调用

添加feign依赖

pom.xml
<dependency>
	<groupId>org.springframework.cloud</groupId>
	<artifactId>spring-cloud-starter-feign</artifactId>
</dependency>

添加@EnableFeignClients注解

ShopcartBeanConfig.java
@SpringBootApplication
@EnableEurekaClient
@EnableFeignClients (1)
public class ShopcartApplication {

	/**
	 *
	 * @author zhailiang@zkh360.com
	 * 2018年6月1日
	 * @param args
	 */
	public static void main(String[] args) {
		SpringApplication.run(ShopcartApplication.class, args);
	}

}

声明feign客户端

ProductClient.java
@FeignClient("commodity-service") (1)
public interface ProductClient {

	@GetMapping("/products/{id}")
	ProductInfo getInfo(@PathVariable("id") Long id); (2)

}
1 声明这是一个Feign客户端,这个客户端的方法会调用commodity-service应用的服务。
2 声明服务调用,与声明Controller的方式类似。当调用这个方法时,往commodity-service的/products/{id}这个地址发一个GET请求,url中的变量id会用方法的id参数代替
ShopcartController.java
@Slf4j
@RestController
@RequestMapping("/shopcart/item")
public class ShopcartController {

//	@Autowired
//	private RestTemplate restTemplate;

	@Autowired
	private ProductClient productClient; (1)

	/**
	 * 将商品加入购物车,创建购物车项
	 *
	 * @author zhailiang@zkh360.com 2018年6月1日
	 * @param id
	 *            商品id
	 * @return
	 */
	@PostMapping
	public void create(Long productId) {

//		String url = "http://commodity-service/product/" + productId;
//		ProductInfo info = restTemplate.getForObject(url, ProductInfo.class);

		ProductInfo info = productClient.getInfo(productId); (2)

		log.info(info.getName());


	}

}
1 注入feign客户端
2 调用商品信息服务
Feign虽然可以使用SpringMVC的注解来声明,但是它实际上是在声明客户端,不是REST服务,所以在一些细节上还是有不同的。在ShopcartController的testFeign方法中,提供了一组标准的增删改查方法调用样例。

3.4.3. 共享代码

不论是使用RestTemplate方式还是Feign方式,都可以在commodity-app和shopcart-app之间不存在任何依赖的情况下调用彼此的服务,并且在客户端进行负载均衡。

有些开发者会为了避免所有的服务调用者都重复开发服务调用的相关代码而提供服务相关的SDK。这种做法虽然可以降低服务调用者的开发量,但是会造成耦合。

在微服务架构中,解耦 > 代码复用。 所以,除非有特殊需要,或者你提供的服务非常稳定,否则,不要提供你的api的SDK.

4. 消息通讯

考虑下面的场景:当订单下单成功后,我们需要将购物车中的相关物品清除,同时需要扣减订单商品的库存。如果我们直接调用购物车服务和商品库存服务,则会形成订单服务对商品服务和购物车服务的依赖,一旦商品服务或购物车服务不可用,则下单服务也会失败,这是应该避免的。所以我们应该发出一个下单成功的消息,在消息中包含订单的相关信息。然后让购物车应用和商品应用监听这个消息,在收到消息后进行相关的处理。

微服务之间80%的调用都应该是通过消息异步完成的。你应该总是优先考虑使用消息来在微服务应用之间通讯。除非你的业务逻辑必须使用服务调用的返回值才能继续进行。
消息通讯的根本目的是解耦。如果你的微服务不需要调用任何其它服务,只是发出消息,那么我们会认为这是一个高内聚低耦合的服务。这是微服务设计的主要目标之一。
这里会有分布式事务问题(其实同步调用也会有),这个问题我们会在后面专门讨论。

4.1. 消息处理架构

消息处理架构
Figure 15. 消息处理架构
消息生产者

产生消息的应用。消息可以是任何基本类型或自定义类型的对象。消息应该被发送到一个指定的destination

destination

消息发送的目的地。根据消息中间件的不同实现不同。

消息消费者

处理消息的应用。有可能和消息生产者是同一应用。消息消费者以组为单位组织。一个组可以由多个服务实例组成。当一个消息被发送到一个destination时,所有连接到这个destination的组都会收到该消息(发布-订阅模式)。在同一个组中,只有一个服务实例会收到消息(点对点模式)。

4.2. 消息应用架构

消息应用架构
Figure 16. 消息应用架构
Application

微服务应用

Service Bean

spring容器中的bean,一般会包含业务逻辑,通过input和output发送和接收消息。

Input

入栈渠道,Service Bean从这里接收消息,用于屏蔽消息中间件的实现细节。通过接口和注解声明。

Output

出栈渠道,Service Bean从这里发送消息,用于屏蔽消息中间件的实现细节。通过接口和注解声明。

Binder

连接input、output和消息中间价,根据消息中间件的不同,可以通过配置改变Binder的实现。对应用透明,在程序中表现为一组配置。

4.3. 消息生产者开发

  • 创建order-app项目

在项目中加入如下依赖:

pom.xml
<dependency>
	<groupId>org.springframework.cloud</groupId>
	<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
  • 声明消息对象。

OrderEvent.java
@Getter
@Setter
public abstract class OrderEvent implements Serializable {

	private static final long serialVersionUID = 1L;

	private Long id;

	private List<Long> productIds;

}
OrderCreatedEvent.java
@Data
@EqualsAndHashCode(callSuper=false)
public class OrderCreatedEvent extends OrderEvent {

	private static final long serialVersionUID = 1L;

	private Date createdTime;

}
  • 声明出栈渠道

OrderEventPublisher.java
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

public interface OrderEventPublisher {

    String ORDER_EVENT_OUTPUT_CHANNEL = "bar"; (1)

    @Output(ORDER_EVENT_OUTPUT_CHANNEL) (2)
    MessageChannel OrderEventOutputChannel();

}
1 出栈渠道的名字
2 通过@Output注解声明出栈渠道,渠道的名字是bar,注意方法的返回值类型。
一个接口中可以声明多个出栈渠道。
  • 注册出栈渠道

OrderApplication.java
@SpringBootApplication
@EnableBinding(OrderEventPublisher.class) (1)
public class OrderApplication {

	public static void main(String[] args) {
		SpringApplication.run(OrderApplication.class, args);
	}

}
1 使用@EnableBinding注册OrderEventPublisher接口,接口中带@Output注解的方法会被自动注册为出栈渠道。

配置出栈渠道(连接kafka)

application.yml
spring:
  cloud:
    stream:
      kafka:
        binder:
          brokers: 127.0.0.1:9092 (1)
          zk-nodes: 127.0.0.1:2182 (2)
      bindings:
        bar: (3)
          content-type: application/json (4)
          destination: test (5)
1 kafka服务地址
2 zookeeper服务地址
3 配置名为bar的输出渠道的属性
4 发送消息时序列化的方式,如果消息发送和接受者共享代码,这里可以不配置。默认使用java标准的序列化方式。
5 消息发送的目的地,这里为kafak的topic的名字

使用出栈渠道发送消息

OrderServiceImpl.java
@Service
public class OrderServiceImpl implements OrderService {

	@Autowired
	private OrderEventPublisher orderEventPublisher; (1)

	@Override
	public void create() {
		//do create order

		OrderCreatedEvent orderCreatedEvent = new OrderCreatedEvent(); (2)
		orderCreatedEvent.setId(1L);
		orderCreatedEvent.setProductIds(Lists.newArrayList(1L, 2L, 3L));
		orderCreatedEvent.setCreatedTime(new Date());

		orderEventPublisher.orderEventOutputChannel()
			.send(MessageBuilder.withPayload(orderCreatedEvent).build(), 1000); (3)

	}

}
1 注入OrderEventPublisher
2 构建自定义事件
3 通过名为bar的出栈渠道发送事件.超时时间为1秒,避免因为消息中间件不可用导致系统等待。

4.4. 消息消费者开发

在shopcart-app里加入以下依赖:

pom.xml
<dependency>
	<groupId>org.springframework.cloud</groupId>
	<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
  • 声明入栈渠道

InputChannels.java
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

public interface InputChannels {

	String ORDER_EVENT_INPUT_CHANNEL = "foo"; (1)

    @Input(ORDER_EVENT_INPUT_CHANNEL) (2)
    SubscribableChannel getOrderInputChannel();

}
1 入栈渠道的名字
2 通过@Input注解声明入栈渠道,渠道的名字是foo,注意方法的返回值类型。
一个接口中可以声明多个入栈渠道。
一个接口中可以即声明入栈渠道也声明出栈渠道。
  • 注册入栈渠道

OrderApplication.java
@SpringBootApplication
@EnableEurekaClient
@EnableFeignClients
@EnableBinding(InputChannels.class) (1)
public class ShopcartApplication {

	public static void main(String[] args) {
		SpringApplication.run(ShopcartApplication.class, args);
	}

}
1 使用@EnableBinding注册InputChannels接口,接口中带@Input注解的方法会被自动注册为入栈渠道。

配置入栈渠道(连接kafka)

application.yml
spring:
  cloud:
    stream:
      kafka:
        binder:
          brokers: 127.0.0.1:9092 (1)
          zk-nodes: 127.0.0.1:2182 (2)
      bindings:
        foo: (3)
          content-type: application/json (4)
          destination: test (5)
		  group: shopcart (6)
1 kafka服务地址
2 zookeeper服务地址
3 配置名为foo的输入渠道的属性
4 接收消息时序列化的方式,如果消息发送和接受者共享代码,这里可以不配置。默认使用java标准的序列化方式。
5 渠道连接的目的地,也就是消息的来源,这里为kafak的topic的名字
6 应用的group

监听入栈渠道收到的消息

ShopcartListener.java
@Slf4j
@Component
public class ShopcartListener {

	@StreamListener("foo") (1)
	public void onOrderMessage(Message<String> message) {
		log.info("message is :" + message); (2)
	}

}
1 监听名为foo的入栈渠道收到的消息。
2 打印消息。因为我们为渠道配的content-type是application/json,所以这里会收到一个json串。如果消息发送者和消息提供者共享代码,Message的泛型可以直接设置为OrderEvent.

OK,现在每次我们在order-app里调用创建订单方法,shopcart-app都会收到消息并打印。

我们还可以在commodity-app里做同样的设置,使commodity-app也可以收到order-app的消息。

如果commodity-app的入栈渠道配置中,foo的group设置和shopcart-app相同,则每次只会有1个应用收到消息,如果不同,则两个应用都会收到消息。

4.5. 消息事务

考虑以下代码:

ShopcartListener.java
@Service
@Transactional
public class OrderServiceImpl implements OrderService {

	@Autowired
	private OrderEventPublisher orderEventPublisher;

	@Override
	public void create() {
		//do create order

		OrderCreatedEvent orderCreatedEvent = new OrderCreatedEvent();
		orderCreatedEvent.setId(1L);
		orderCreatedEvent.setProductIds(Lists.newArrayList(1L, 2L, 3L));
		orderCreatedEvent.setCreatedTime(new Date());

		orderEventPublisher.orderEventOutputChannel()
			.send(MessageBuilder.withPayload(orderCreatedEvent).build(), 1000); (1)

		throw new RuntimeException("test");

	}
}

我们在1处发送了消息,但是后面的程序抛出了异常。就算发送消息的代码是方法的最后一行,这也是有可能发生的。比如create()方法是由Spring管理事务的,在事务提交执行SQL时抛出了异常。那么这时,create()方法里的数据库操作会被回滚,而消息已经发出,这时就会导致消息的生产者和消费者之间数据不一致。所以这里我们需要消息渠道支持事务,只有在数据库事务成功提交以后才发送消息。

当前版本的Spring Cloud Stream,只有在Broker是RabbitMQ时才支持消息事务。

将所有的依赖改为:

pom.xml
<dependency>
	<groupId>org.springframework.cloud</groupId>
	<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

修改消息生产者和消息消费者的配置文件,注释掉kafka连接信息,改为连接rabbitmq

消息生产者配置如下:

application.yml
spring:
  cloud:
	stream:
	  binders:
        rabbit:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                addresses: amqp://127.0.0.1:5672
                username: sa
                password: 123456
      bindings:
        bar:
          binder: rabbit
          destination: test
          content-type: application/json
      rabbit:
        binddings:
          bar:
            producer:
              transacted: true (1)
1 出栈渠道bar支持消息事务

OK,重新配置后的渠道会支持事务。只有create()方法的事务成功提交后才会发送消息。

4.6. 性能

当消息生产的速度大于消费的速度时,会产生消息堆积,消息堆积多了,就内存溢出了,所以需要有机制来提高消息消费者的消费能力。

4.6.1. 并发

最简单的方式就是多线程,同时开多个线程来处理消息,配置如下:

application.yml
spring:
  cloud:
    stream:
      binders:
        rabbit:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                addresses: amqp://127.0.0.1:5672
                username: sa
                password: 123456
      bindings:
        foo:
          binder: rabbit
          destination: test
          content-type: application/json
          group: shopcart
          consumer:
            concurrency: 2 (1)
1 对于foo渠道进入的消息开2个线程来并发处理。

4.6.2. 消息分片

消息最终是点对点的模式来消费的,也就是说,在一个group中,只会有一台机器得到消息,而一台机器开线程的能力终究有限,当多线程也无法满足消费速度要求时,就需要对消息进行分片,把不同片的消息分到同一个group的不同机器上去处理。

暂时用不到,有时间再写…​…​

4.6.3. 消息可靠性投递

消息可靠性方案

消息可靠性方案

为确保消息成功投递,设计保障消息可靠性投递方案如下

  1. 数据入库:将需要发送的数据保存至生产端数据库中,生成需要发送的消息

  2. 首次发送消息:生产端第一次发送消息到mq broker中,此条消息将会被消费端消费

  3. 二次发送消息:生产端第二次发送的消息到mq broker中,此条消息用于验证消息是否被 消费端成功消费,此条消息将会被回调服务端监听,用于与消费端消费消息后回送的消息进行比较判断,本次所发送的消息必须与首次发送的消息完全一致

  4. 消费端消费消息:消费端消费生产端首次发送的消息

  5. 消费端生产消息:消费端成功消费消息后,需要将消息的完整数据重新发送到mq broker中

  6. 消息入库:回调服务端监听消费端生产的消息,将信息保存至消息数据库中

  7. 消息检查:回调服务端监听到生产端二次发送的消息后,将消息与消息数据库中数据进行对比,如果消息数据库中没有此条消息,则需要通知生产端重新发送消息

此方案中主要涉及到三个部分:生产端,消费端,回调服务端

  1. 生产端:生产消息共需发送两次消息

  2. 消费端:消费消息并且需要投递一次消息

  3. 回调服务端:消息入库操作以及根据消息是否存在消息数据库判断是否需要重发消息

生产端 声明常量类对象

Constants .java
public final class Constants {

	public static final String rabbit_addresses = "mq.dev.zkh360.com:50017";           (1)
	public static final String rabbit_username = "zkhdev";                             (2)
	public static final String rabbit_password = "zkhdev";                             (3)
	public static final String rabbit_virtual_host = "/";                              (4)

	public static final String rabbit_exchangename = "exchange_first_meaasge";         (5)
	public static final boolean rabbit_exchangedurable = true;                         (6)
	public static final boolean rabbit_exchangeautodelete = true;                      (7)
	public static final boolean rabbit_ignoreDeclarationExceptions = true;             (8)
	public static final String rabbit_queuename = "queue_first_message";               (9)
	public static final boolean rabbit_queuedurable = true;                            (10)
	public static final String rabbit_routingkey = "routing.first.# ";                 (11)
	public static final String rabbit_key = "routing.first.message";                   (12)

}
1 rabbitmq服务器地址
2 rabbitmq服务器用户名
3 rabbitmq服务器密码
4 rabbitmq服务器虚拟主机
5 生产者端首次生产消息交换机名称
6 生产者端首次生产消息交换机持久化
7 生产者端首次生产消息交换机不自动删除
8 生产者端首次生产消息忽略交换机声明错误异常
9 生产者端首次生产消息接收队列名称
10 生产者端首次生产消息队列持久化
11 生产者端首次生产消息路由键
12 生产者端首次生产消息路由名称

声明配置类对象

RabbitMQConfig .java
@Configuration
@ComponentScan({"com.bfxy.spring.*"})
public class RabbitMQConfig {

	@Bean
	public ConnectionFactory connectionFactory(){                                           (1)
		CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
		connectionFactory.setAddresses(Constants.rabbit_addresses);
		connectionFactory.setUsername(Constants.rabbit_username);
		connectionFactory.setPassword(Constants.rabbit_password);
		connectionFactory.setVirtualHost(Constants.rabbit_virtual_host);
		return connectionFactory;
	}

	@Bean
	public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {                   (2)
		RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
		rabbitAdmin.setAutoStartup(true);
		return rabbitAdmin;
	}

    @Bean
	public TopicExchange firstExchange() {                                                  (3)
		Map<String, Object> arguments = new HashMap<String, Object>();
		arguments.put("ignoreDeclarationExceptions", Constants.rabbit_ignoreDeclarationExceptions);
		return new TopicExchange(Constants.rabbit_exchangename, Constants.rabbit_exchangedurable,
				Constants.rabbit_exchangeautodelete, arguments);
	}

    @Bean
	public Queue firstQueue() {                                                             (4)
		return new Queue(Constants.rabbit_queuename, Constants.rabbit_queuedurable);
	}

    @Bean
    public Binding firstBinding() {                                                         (5)
        return BindingBuilder.bind(firstQueue()).to(firstExchange()).with(Constants.rabbit_routingkey);
    }

}
1 获取rabbitmq连接
2 定义rabbitAdmin对象
3 定义topic类型交换机
4 定义队列
5 定义交换机与队列绑定

声明消息生产端对象

ProducerMessage .java
@Component
 public class ProducerMessage {

	@Autowired
	private RabbitAdmin rabbitAdmin;

	@Autowired
	private Binding firstBinding;

    @Autowired
	private UpDownStream upDownStream;

	/**
	 * 声明交换机,队列及建立关联
	 */

     rabbitAdmin.declareBinding(firstBinding);                                           (1)

/**
	 *
	 * 消费端发送消息
	 * @param message              发送的消息
	 * @param exchangename         自定义交换机名称
	 * @param routingname          自定义路由名称
	 * @return                     消息id
	 */


	String messageid = upDownStream.upSeMessage(message,exchangename,routingname);      (2)

}
1 声明队列及交换机并建立关联
2 调用发送消息方法并接收返回值
  1. 注入rabbitAdmin,firstBinding,upDownStream。

  2. 参数为需要发送的消息message,交换机名称,路由名称。

  3. 通过sendMessage发送消息并接受返回值,返回值为消息唯一识别标识,需要将返回值保存以便用于后续查询。

消费端 声明常量类对象

Constants .java
public final class Constants {

	public static final String rabbit_addresses = "mq.dev.zkh360.com:50017";           (1)
	public static final String rabbit_username = "zkhdev";                             (2)
	public static final String rabbit_password = "zkhdev";                             (3)
	public static final String rabbit_virtual_host = "/";                              (4)

	public static final String rabbit_exchangename = "exchange_first_meaasge";         (5)
	public static final boolean rabbit_exchangedurable = true;                         (6)
	public static final boolean rabbit_exchangeautodelete = true;                      (7)
	public static final boolean rabbit_ignoreDeclarationExceptions = true;             (8)
	public static final String rabbit_queuename = "queue_first_message";               (9)
	public static final boolean rabbit_queuedurable = true;                            (10)
	public static final String rabbit_routingkey = "routing.first.# ";                 (11)

}
1 rabbitmq服务器地址
2 rabbitmq服务器用户名
3 rabbitmq服务器密码
4 rabbitmq服务器虚拟主机
5 消费端消费消息交换机名称
6 消费端消费消息交换机持久化
7 消费端消费消息交换机不自动删除
8 消费端消费消息忽略交换机声明错误异常
9 消费端消费消息接收队列名称
10 消费端消费消息队列持久化
11 消费端消费消息路由键
消费端消费消息Exchange,Queue需要与生产端一致

声明配置类对象

RabbitMQConfig .java
@Configuration
@ComponentScan({"com.bfxy.spring.*"})
public class RabbitMQConfig {

	@Bean
	public ConnectionFactory connectionFactory(){                                           (1)
		CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
		connectionFactory.setAddresses(Constants.rabbit_addresses);
		connectionFactory.setUsername(Constants.rabbit_username);
		connectionFactory.setPassword(Constants.rabbit_password);
		connectionFactory.setVirtualHost(Constants.rabbit_virtual_host);
		return connectionFactory;
	}

	@Bean
	public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {                   (2)
		RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
		rabbitAdmin.setAutoStartup(true);
		return rabbitAdmin;
	}

    @Bean
	public TopicExchange firstExchange() {                                                  (3)
		Map<String, Object> arguments = new HashMap<String, Object>();
		arguments.put("ignoreDeclarationExceptions", Constants.rabbit_ignoreDeclarationExceptions);
		return new TopicExchange(Constants.rabbit_exchangename, Constants.rabbit_exchangedurable,
				Constants.rabbit_exchangeautodelete, arguments);
	}

    @Bean
	public Queue firstQueue() {                                                             (4)
		return new Queue(Constants.rabbit_queuename, Constants.rabbit_queuedurable);
	}

    @Bean
    public Binding firstBinding() {                                                         (5)
        return BindingBuilder.bind(firstQueue()).to(firstExchange()).with(Constants.rabbit_routingkey);
    }

}
1 获取rabbitmq连接
2 定义rabbitAdmin对象
3 定义topic类型交换机
4 定义队列
5 定义交换机与队列绑定

声明消息消费端对象

ConsumerMessage .java
@Component
 public class ConsumerMessage {

	@Autowired
	private RabbitAdmin rabbitAdmin;

	@Autowired
	private Binding firstBinding;


	/**
	 * 声明交换机,队列及建立关联
	 */

     rabbitAdmin.declareBinding(firstBinding);                                                                              (1)

/**
	 * callbackservice consumer message
	 * @param message
	 * @param channel
	 * @param headers
	 * @return
	 * @throws Exception
	 */

	@RabbitListener(bindings = @QueueBinding(
			value = @Queue(value = "${spring.rabbitmq.listener.first.queue.name}",                                          (2)
			durable="true"),
			exchange = @Exchange(value = "${spring.rabbitmq.listener.first.exchange.name}",                                 (3)
			durable="true",
			type= "topic",
			ignoreDeclarationExceptions = "true"),
			key = "${spring.rabbitmq.listener.first.key}"                                                                   (4)
			)
	)
	@RabbitHandler
	public void consumerMessage(Message message,Channel channel) throws Exception {
		// TODO                                                                                                             (5)
	}

}
1 声明交换机,队列及建立关联
2 消费端需要获取消息的队列名称
3 消费端需要获取消息的交换机名称
4 消费端需要获取消息的路由键名称
5 自定义实现消费消息方法,但必须含有@RabbitListener注解
消费端自定义实现含有@RabbitListener注解的消费消息方法,引入confirmmessage.jar完成消费消息之后发送一条确认消息至mq broker

回调服务端 创建updownApp应用,并在pom.xml中添加依赖。

pom.xml
<dependency>
	    <groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
		<groupId>com.alibaba</groupId>
		<artifactId>fastjson</artifactId>
		<version>1.1.26</version>
</dependency>
<dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-aop</artifactId>
</dependency>

添加配置文件application.properties

application.properties
server.servlet.context-path=/
server.port=8090
spring.rabbitmq.addresses=mq.dev.zkh360.com:50017                              (1)
#spring.rabbitmq.host=mq.dev.zkh360.com                                        (2)
#spring.rabbitmq.port=50017                                                    (3)
spring.rabbitmq.username=zkhdev                                                (4)
spring.rabbitmq.password=zkhdev                                                (5)
spring.rabbitmq.virtual-host=/                                                 (6)
spring.rabbitmq.connection-timeout=15000                                       (7)
spring.rabbitmq.template.retry.enabled=true                                    (8)
spring.rabbitmq.template.retry.initial-interval=2s                             (9)

#producer                                                                      (10)
spring.rabbitmq.publisher-confirms=true                                        (11)
spring.rabbitmq.publisher-returns=true                                         (12)
spring.rabbitmq.template.mandatory=true                                        (13)

#consumer                                                                      (14)
spring.rabbitmq.listener.retry.enabled=true                                    (15)
spring.rabbitmq.listener.simple.acknowledge-mode=manual                        (16)
spring.rabbitmq.listener.simple.concurrency=5                                  (17)
spring.rabbitmq.listener.simple.max-concurrency=10                             (18)

#second send message                                                           (19)
spring.rabbitmq.listener.second.exchange.name=exchange_second_meaasge          (20)
spring.rabbitmq.listener.second.exchange.durable=true                          (21)
spring.rabbitmq.listener.second.exchange.type=topic                            (22)
spring.rabbitmq.listener.second.exchange.ignoreDeclarationExceptions=true      (23)
spring.rabbitmq.listener.second.routingkey=routing.second.#                    (24)
spring.rabbitmq.listener.second.key=routing.second.message                     (25)
spring.rabbitmq.listener.second.queue.name=queue_second_message                (26)
spring.rabbitmq.listener.second.queue.durable=true                             (27)
spring.rabbitmq.listener.second.queue.ttl=600000                               (28)

#second send message to dxl meaasge                                            (29)
spring.rabbitmq.listener.dxl.exchange.name=exchange_dxl_meaasge                (30)
spring.rabbitmq.listener.dxl.exchange.durable=true                             (31)
spring.rabbitmq.listener.dxl.exchange.type=fanout                              (32)
spring.rabbitmq.listener.dxl.exchange.ignoreDeclarationExceptions=true         (33)
spring.rabbitmq.listener.dxl.queue.name=queue_dxl_message                      (34)
spring.rabbitmq.listener.dxl.queue.durable=true                                (35)

#consumer send confim message                                                  (36)
spring.rabbitmq.listener.confim.exchange.name=exchange_confim_meaasge          (37)
spring.rabbitmq.listener.confim.exchange.durable=true                          (38)
spring.rabbitmq.listener.confim.exchange.type=topic                            (39)
spring.rabbitmq.listener.confim.exchange.ignoreDeclarationExceptions=true      (40)
spring.rabbitmq.listener.confim.routingkey=routing.confim.#                    (41)
spring.rabbitmq.listener.confim.key=routing.confim.message                     (42)
spring.rabbitmq.listener.confim.queue.name=queue_confim_message                (43)
spring.rabbitmq.listener.confim.queue.durable=true                             (44)
1 rabbitmq服务器地址
2 可将rabbitmq服务器地址拆分成ip
3 可将rabbitmq服务器地址拆分成port
4 rabbitmq服务器用户名
5 rabbitmq服务器密码
6 rabbitmq服务器虚拟主机
7 rabbitmq服务器超时时间设置
8 生产者端的重试
9 生产者端重试的时间间隔
10 生产端配置
11 开启发送消息到exchange确认机制
12 开启发送消息到exchange返回监听机制
13 开启发送消息到不存在exchange消息仍然存在机制
14 消费端相关配置
15 消费端重试的时间间隔
16 设置消费端签收机制为手工签收
17 消费者最小数量
18 消费者最大数量
19 生产者端二次延迟确认生产消息相关配置
20 生产者端二次延迟确认生产消息交换机名称
21 生产者端二次延迟确认生产消息交换机持久化
22 生产者端二次延迟确认生产消息交换机路由方式topic
23 生产者端二次延迟确认生产消息忽略交换机声明错误异常
24 生产者端二次延迟确认生产消息路由键
25 生产者端二次延迟确认生产消息路由名称
26 生产者端二次延迟确认生产消息接收队列名称
27 生产者端二次延迟确认生产消息队列持久化
28 生产者端二次延迟确认生产消息队列中消息存活时间
29 生产者端二次延迟确认消息实际消息存储相关配置
30 生产者端二次延迟确认消息实际消息存储交换机名称
31 生产者端二次延迟确认消息实际消息存储交换机持久化
32 生产者端二次延迟确认消息实际消息存储交换机路由方式fanout
33 生产者端二次延迟确认消息实际消息存储忽略交换机声明错误异常
34 生产者端二次延迟确认消息实际消息存储接收队列名称
35 生产者端二次延迟确认消息实际消息存储队列持久化
36 消费端生产确认消息相关配置
37 消费端生产确认消息交换机名称
38 消费端生产确认消息交换机持久化
39 消费端生产确认消息交换机路由方式topic
40 消费端生产确认消息忽略交换机声明错误异常
41 消费端生产确认消息路由键
42 消费端生产确认消息路由名称
43 消费端生产确认消息接收队列名称
44 消费端生产确认消息队列持久化
Exchange命名方式:exchange_*_message(*可设置为first,second,confirm,dxl)。
Queue命名方式:Queue*_message(*可设置为first,second,confirm,dxl)。
routingKey命名方式:routing_*_#(*可设置为first,second,confirm,dxl)。
exchange与queue均采用持久化方式,除死信队列路由方式采用fanout外其余队列路由方式采用topic。
exchange的ignoreDeclarationExceptions均设置为true,忽略exchange声明异常。
设置生产端二次消息接收队列中消息生存时间为10分钟,超时消息将路由至延迟确认消息队列。
生产者端首次生产消息相关配置中涉及Exchange,Queue,Routingkey都可以自定义名称。

声明核心配置类

MainConfig.java
@Configuration
@ComponentScan({"com.barry.springboot.*"})
public class MainConfig {

}

声明回调服务对象

UpDownStream.java
@Component
public class UpDownStream {

	@Autowired
	private RabbitTemplate rabbitTemplate;


	@Value("${spring.rabbitmq.listener.second.exchange.name}")
	private String secondexchange;
	@Value("${spring.rabbitmq.listener.second.routingkey}")
	private String secondroutingKey;
	@Value("${spring.rabbitmq.listener.confim.exchange.name}")
	private String confimexchange;
	@Value("${spring.rabbitmq.listener.confim.routingkey}")
	private String confimroutingKey;
	@Value("${spring.rabbitmq.listener.confim.exchange.name}")

	/**
	 *
	 * 生产端发送消息方法
	 * @param message              发送的消息
	 * @param exchangename         自定义交换机名称
	 * @param routingname          自定义路由名称
	 * @return                     消息id
	 */
	public String upSeMessage(Object message,String exchangename,String routingname) {
		String messageid = UUID.randomUUID().toString().replaceAll("-", "").toLowerCase();
		Map<String, Object> properties = new HashMap<String, Object>();
		properties.put("messageid", messageid);
		properties.put("exchangename", exchangename);
		properties.put("routingname", routingname);
		MessageHeaders messageHeaders = new MessageHeaders(properties);                              (1)
		Message msg = MessageBuilder.createMessage(ordermessage, messageHeaders);                    (2)
		CorrelationData cd = new CorrelationData(messageid);                                         (3)
		rabbitTemplate.convertAndSend(exchangename, routingname, msg, cd);                           (4)
		rabbitTemplate.convertAndSend(secondexchange, secondroutingKey, msg, cd);                    (5)
	}



	/**
	 * 中间服务消费消费端确认消息
	 * @param message
	 * @param channel
	 * @param headers
	 * @return
	 * @throws Exception
	 */

	@RabbitListener(bindings = @QueueBinding(
			value = @Queue(value = "${spring.rabbitmq.listener.confim.queue.name}",
			durable="${spring.rabbitmq.listener.confim.queue.durable}"),
			exchange = @Exchange(value = "${spring.rabbitmq.listener.confim.exchange.name}",
			durable="${spring.rabbitmq.listener.first.exchange.durable}",
			type= "${spring.rabbitmq.listener.first.exchange.type}",
			ignoreDeclarationExceptions = "${spring.rabbitmq.listener.first.exchange.ignoreDeclarationExceptions}"),
			key = "${spring.rabbitmq.listener.first.key}"
			)
	)
	@RabbitHandler
	public void callBackConsumerMessage(@Payload Message message,
			Channel channel,
			@Headers Map<String, Object> headers) throws Exception {
		System.err.println("---------------callbackservice-----------------------");

		String msg = message.getPayload().toString();
		String messageid = (String) message.getHeaders().get("messageid");
		int type = Integer.parseInt(messageid)%8;
		// TODO 根据type将messageid,msg保存至业务数据库中                                           (6)
		Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
		//手工ACK
		channel.basicAck(deliveryTag, false);                                                        (7)

	}

	/**
	 * 中间服务消费生产端二次发送的确认消息
	 * @param message
	 * @param channel
	 * @param headers
	 * @return
	 * @throws Exception
	 */

	@RabbitListener(bindings = @QueueBinding(
			value = @Queue(value = "${spring.rabbitmq.listener.dxl.queue.name}",
			durable="${spring.rabbitmq.listener.dxl.queue.durable}"),
			exchange = @Exchange(value = "${spring.rabbitmq.listener.dxl.exchange.name}",
			durable="${spring.rabbitmq.listener.dxl.exchange.durable}",
			type= "fanout",
			ignoreDeclarationExceptions = "${spring.rabbitmq.listener.dxl.exchange.ignoreDeclarationExceptions}")
			)
	)
	@RabbitHandler
	public void callBackDelayMessage(@Payload Message message,
			Channel channel,
			@Headers Map<String, Object> headers) throws Exception {
		System.err.println("---------------callbackservice-----------------------");
		// TODO 监听到生产端二次发送的消息与msg DB中数据比较(使用messageid)
		String messageid = (String) message.getHeaders().get("messageid");                          (8)
		// 如数据库中无数据则调用upSeMessage发送数据
		String newMsgid = UUID.randomUUID().toString().replaceAll("-", "").toLowerCase();
		CorrelationData cd = new CorrelationData(newMsgid);
		message.getHeaders().put("messageid", newMsgid);                                            (9)
		String exchangename = (String) message.getHeaders().get("exchangename");
		String routingname = (String) message.getHeaders().get("routingname");
		rabbitTemplate.convertAndSend(exchangename, routingname, message, cd);                      (10)
		rabbitTemplate.convertAndSend(secondexchange, secondroutingKey, message, cd);               (11)
		Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
		//手工ACK
		channel.basicAck(deliveryTag, false);                                                       (12)

	}

}

<1>封装消息headers

<2>封装消息

<3>消息唯一id

<4>生产第一条消息至普通队列

<5>生产第二条消息至设置了消息超时时间的队列

<6>将消息id,消息根据保存至不同数据表中

<7>手工确认签收消息

<8>监听生产端二次发送的延迟确认消息并根据消息id与业务数据库内容进行比较

<9>修改消息id,封装需要重发的消息

<10>重发消息

<11>重发验证消息

<12>手工确认签收消息

声明消费端辅助发送消息对象

DownSendMessage.java
@Aspect
@Order(-99)
@Component
public class DownSendMessage {

	@Autowired
	private RabbitTemplate rabbitTemplate;

	private String confimexchange;
	@Value("${spring.rabbitmq.listener.confim.routingkey}")
	private String confimroutingKey;
	@Value("${spring.rabbitmq.listener.confim.exchange.name}")

	@After("@annotation(org.springframework.amqp.rabbit.annotation.RabbitListener)")
    public void sendConfirmMessage(JoinPoint point) {
       rabbitTemplate.convertAndSend(confimexchange, confimroutingKey, point.getArgs());             (1)
    }

}

<1>消费端消费消息后发送确认消息

将此类生成jar包(confirmmessage.jar)供消费端调用,已完成消费端成功消费消息后发送一条确认消息至mq broker

回调服务方案设计 回调服务端提供发送消息方法(为保障消息可靠性投递,生产段需要发送两次消息,由回调服务端提供方法发送具体消息操作),消息保存至数据库方法(消息需要保存到数据库中用于后续查询等,因此回调服务端在接收到消费端发送的确认消息后需要消费确认消息并将消息保存到数据库中),延迟消息比较方法(为验证消息是否被消费端消费,需要验证生产端发送的二次确认消息是否已经存在数据库中)

  1. 为保障消息投递成功,消息生产端需要发送两次消息,第一次为消费端消费的消息,第二次为用于判断消息是否消费成功的延迟确认消息。实现这种情况有两种方式可以选择

    • 消息生产端发送第一条消息,设置延迟任务发送第二条消息:如实现此种方式需要第一次发送消息后将消息保存,使用aop在第一次发送消息方法执行后异步发送第二次消息

    • 由回调服务端提供方法发送两条消息到mq broker 第一种方式需要保存消息直到第二次发送成功才可释放,还需要设置定时任务发送第二条消息会增加开发复杂度,数据量较大时也会增加生产端性能压力,因此选择第二种方式

  2. 回调服务端发送两条消息时,第一条消息发送至普通队列即可,第二条消息可选择发送至

    • 普通队列

    • 设置了消息生存时间的队列 第二条消息是用于做延迟确认,因此需要与第一条消息到达mq broker有时间差,之前已经选择由回调服务端连续发送两条消息,因此选择将第二条消息发 送到设置消息生存时间的队列起到延时的作用,消息生存超时将路由到死信队列,回调服务端只需要监听死信队列即可对消息比较判断

  3. 回调服务端需要监听消费端消费消息成功后重发的消息,并保存至消息数据库,可选择

    • 消费端主动发送消息

    • 消费端调用回调服务端方法发送消息

    • aop方法在消费端消费消息方法执行后发送消息 消费端回发的消息需要回调服务端监听并保存至数据库,如选择消费端主动发送消息,需要消费端将接收到的消息拆解重组并指定交换机及路由名称会增加消费端处理操作,如选择消费端调用回调服务端方法发送消息可能会发生消费端成功消费消息但回送消息未成功发送的情况,因此选择消费端成功消费后使用切片方法发送消息

  4. 回调服务端需要对延迟确认消息比较确定是否需要重发消息,如果消息数据库没有延迟确认消息则

    • 调用生产端方法重发消息

    • 回调服务端对接收到的延迟确认消息拆分重组后重新发送 如果选择调用生产端方法,则需要将消息重新发送至生产端,生产端拆解消息内容,重新组装消息后在调用回调服务端提供方法发送消息,过程比较反复,会增加生产端压力。如果选择第二种方式则只需要在获取到消息不存在是,改变接受到消息的uuid即可重新发送,增加处理效率

  5. 消费端消费消息后需要发送一条确认消息到mq broker中则

    • 消费端发送消息到mq broker

    • 消费端调用回调服务端方法发送消息到mq broker

    • 消费端通过引入切片方法发送消息到mq broker 消费端消费消息后需要将接收到的消息发送到mq broker中用于判断,因此接收到的消息不需要任何的处理即可发送,考虑到减少消费端开发难度,以及调用方法中间过程可能出现如网络突然断开,回调服务端出现响应异常等情况,选取切片方法的方式,发送确认消息

  6. 回调服务端需要将消费端回送的方法保存至数据库,如果将全部消息都保存至一张表中,会造成消息量大时数据库插入和查询速度降低等问题,因此对数据库的设计采用8张表保存。消息需要有唯一的标识,方案采取uuid作为消息的唯一标识,根据uuid对8取余的结果将消息分别保存至消息表中,回调服务端设置定时任务在每周日0时将8张表中数据备份至相对应的8张表中。表命名方式为n_message,表定义如下

数据库定义

数据库定义
除了生产端第一次发送消息所使用的交换机,队列及路由方式需要生产者消费者声明,其余使用到的交换机,队列,路由方式均有回调服务端声明。生产端,消费端所定义的交换机,队列及路由方式需要一致,如果可以确定已经存在则生产端及消费端可以省去交换机,队列及路由方式的相关定义和声明绑定

5. 异步编程

想象下面的场景:

  1. 商品中心提供一个服务,接收一组商品id,返回一组商品信息。搜索,浏览历史等功能会调这个服务。

  2. 商品信息由商品基本信息和商品价格信息组成。

  3. 获取商品基本信息需要耗时100毫秒,获取价格信息也需要耗时100毫秒

伪代码实现如下:

ProductController.java
public List<ProductInfo> getInfo(List<Long> ids) {
	return ids.stream().map(id -> {
		ProductInfo info = new ProductInfo();
		info.setBasicInfo(getProductBasicInfo(id));
		info.setPriceInfo(getProductPriceInfo(id));
		return info;
	}).collect(toList());
}

private ProductPriceInfo getProductPriceInfo(Long id) {
	try {
		Thread.sleep(100);
	} catch (InterruptedException e) {
		e.printStackTrace();
	}
	return new ProductPriceInfo();
}

private ProductBasicInfo getProductBasicInfo(Long id) {
	try {
		Thread.sleep(100);
	} catch (InterruptedException e) {
		e.printStackTrace();
	}
	return new ProductBasicInfo();
}

这段代码的问题在于它的耗时是随着传入的id数量线性增长的,传入10个id的耗时会是传入1个id的耗时的10倍。而这些操作其实是没有互相依赖关系的,10个id可以同时去取数据,获取基本信息和获取价格信息也可以同时进行。异步编程可以解决这个问题。

异步编程的目的是压榨应用服务器性能,缩短服务响应时间。消息通讯的目的是解耦,附带缩短服务响应时间效果。

5.1. Future

Future接口在Jdk1.5中被引入,设计初衷是对将来某个时刻会发生的结果进行建模。它建模了一种异步计算,返回一个执行运算结果的引用,但运算结束后,这个引用被返回给调用方。在Future中触发那些耗时的操作,把调用线程解放出来,让它能继续执行其他有价值的工作,不再需要呆呆等待耗时的操作完成。打个比方你可以把它想象成这样的场景:你拿了一袋子衣服到你中意的干洗店去洗。干洗店的员工会给你张发票,告诉你什么时候你的衣服会洗好(这张发票就是Future)。衣服干洗的同时,你可以去做其他的事情。Future的另一个优点是它比更底层的Thread更易用。要使用Future,通常你只需要将耗时的操作封装在一个Callable对象中,再将它提交给ExecutorService,就万事大吉了。

使用Future以异步方式执行一个耗时的操作
ExecutorService executor = Executors.newCachedThreadPool(); (1)
Future<String> future = executor.submit(new Callable<String>() { (2)
	@Override
	public String call() throws Exception {
		return doSomeLongOperation(); (3)
	}
});

doSomethingElse(); (4)

String result = future.get(1, TimeUnit.SECONDS); (5)
1 创建 ExecutorService 通过它你可以向线程池提交任务
2 ExecutorService 提交一个Callable对象
3 以异步方式在新的线程中执行耗时的操作
4 异步操作进行的同时,你可以做其他的事情
5 获取异步操作的结果,如果最终被阻塞,无法得到结果,那么在最多等待1秒钟之后退出

这种编程方式让你的线程可以在 ExecutorService 以并发方式调用另一个线程执行耗时操作的同时,去执行一些其它的任务。接着,如果你已经运行到没有异步操作的结果就无法继续任何有意义的工作时,可以调用它的get方法去获取操作的结果。如果操作已经完成,该方法会立刻返回操作的结果,否则他会阻塞你的线程,直到操作完成,返回相应的结果。

为了防止长时间运行的操作永远不返回导致线程永远等待,你应该优先使用重载版本的get方法,它接受一个超时的参数,通过它,你可以定义你的线程等待Future结果的最长时间,而不是永无止境的等待下去。

Future虽然简化了异步开发,但是在很多复杂的场景下,使用Future还是很麻烦的,所以在Jdk1.8中,引入了CompletableFuture,大大加强了使用Future处理复杂场景的能力。

5.2. 分支合并框架

分支/合并框架在Jdk1.7中被引入,分支/合并框架的目的是以递归方式将可以并行的任务拆分成更小的任务,然后将每个子任务的结果合并起来生成整体结果。它是ExecutorService接口的一个实现,它把子任务分配给线程池(ForkJoinPool)中的工作线程。

5.2.1. 使用RecursiveTask

要把任务提交到这个池,必须创建RecursiveTask<R>的一个子类,其中R是并行化任务(以及所有子任务)产生的结果的类型,如果任务不返回结果,则是RecursiveAction类型。要定义RecursiveTask,只需要实现它唯一的抽象方法compute。这个方法同时定义了将任务拆分成子任务的逻辑,以及无法再拆分或不方便再拆分时,生成单个子任务结果的逻辑。正由于此,这个方法的实现类似于下面的伪代码:

if (任务足够小或不可分) {
	顺序计算该任务
} else {
	将任务分成两个子任务
	递归调用本方法,拆分每个子任务,等待所有子任务完成
	合并每个子任务的结果
}

拆分的过程如图所示:

async

你可能已经注意到,这只不过是著名的分治算法的并行版本而已。这里举一个用分支/合并框架的实际例子,让我们试着用这个框架为一个数字范围(这里用一个long[]表示)求和。

ForkJoinSumCalculator.java
public class ForkJoinSumCalculator extends RecursiveTask<Long> { (1)
	/**
	 *
	 */
	private static final long serialVersionUID = -4008111329500774405L;

	private final long[] numbers; (2)
	private final int start; (3)
	private final int end;

	private static final long LIMIT = 10000; (4)

	public ForkJoinSumCalculator(long[] numbers) { (5)
		this(numbers, 0, numbers.length);
	}

	private ForkJoinSumCalculator(long[] numbers, int start, int end) { (6)
		this.numbers = numbers;
		this.start = start;
		this.end = end;
	}

	@Override
	protected Long compute() { (7)
		int length = end - start; (8)
		if(length <= LIMIT) {
			return computeSum(); (9)
		}

		ForkJoinSumCalculator leftTask = new ForkJoinSumCalculator(numbers, start, start + length / 2); (10)
		leftTask.fork(); (11)

		ForkJoinSumCalculator rightTask = new ForkJoinSumCalculator(numbers, start + length / 2, end); (12)

		Long rightResult = rightTask.compute(); (13)
		Long leftResult = leftTask.join(); (14)

		return rightResult + leftResult; (15)
	}

	private Long computeSum() { (16)
		long sum = 0;
		for (int i = start; i < end; i++) {
			sum += numbers[i];

		}
		return sum;
	}

}
1 继承 RecursiveTask 来创建可以用于分支/合并框架的任务
2 要求和的数组
3 子任务处理的数组的起始和终止位置
4 不再将任务分解为子任务的数组大小
5 公共构造函数用于创建主任务
6 私有构造函数用于以递归方式为主任务创建子任务
7 覆盖 RecursiveTask 抽象方法
8 该任务负责求和的部分的大小
9 如果大小小于或等于阈值,则不再拆分任务,顺计算结果
10 创建一个子任务来为数组的前一半求和
11 利用另一个线程异步执行新创建的子任务
12 创建一个子任务来为数组的后一半求和
13 同步执行第二个子任务,有可能允许进一步递归划分
14 读取第一个子任务的结果,如果尚未完成就等待
15 该任务的结果时两个子任务结果的组合
16 在子任务不再可分时结算结果的简单算法

现在编写一个方法来并行对前n个自然数求和就很简单了。你只需要把想要的数字数组传给ForkJoinSumCalculator的构造函数

Main.java
public class Main {

	public static void main(String[] args) {
		long[] numbers = LongStream.rangeClosed(0, 100000000).toArray(); (1)
		ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers); (2)
		long now = System.currentTimeMillis();
		System.out.println(new ForkJoinPool().invoke(task)); (3)
		System.out.println("耗时:" + (System.currentTimeMillis() - now));
	}

}
1 生成一个一千万个元素的数组
2 用这个数组构造一个ForkJoinSumCalculator
3 执行任务并打印计算用时
在实际应用中,使用多个ForkJoinPool是没有什么意义的。一般应该把它声明为单例对象。在例子中创建时使用了其默认的无参数构造函数,这意味着想让线程池使用JVM能够使用的所有处理器。更确切的说,该构造函数将使用Runtime.getRuntime().availableProcessors()的返回值来决定线程池使用的线程数。请注意availableProcessors方法虽然看起来是处理器,但它实际上返回的是可用内核的数量。

5.2.2. 工作窃取

在上面的ForkJoinSumCalculator例子中,我们决定在要求和的数组中最多包含1万个项目时就不再创建子任务了。这个选择是很随意的,但是大多数情况下你很难找到一个方法来确定这个阈值,只能试几个不同的值来尝试优化他。在我们的测试案例中,我们用了一个有1千万项目的数组,意味着ForkJoinSumCalculator至少会分出1000个子任务来。这似乎有点浪费资源,因为我们用来运行它的机器上只有四个内核。在这个特定例子中可能确实是这样,因为所有的任务都受CPU约束,预计所花的时间也差不多。

但分出大量的小任务一般来说都是一个好的选择。这是因为,理想情况下,划分并行任务时,应该让每个任务都用完全相同的时间完成,让所有的CPU内核都同样繁忙。不行的是,实际中,每个子任务所花的时间可能天差地别,要么是因为划分策略效率低,要么是不可预知的原因,比如磁盘访问慢,或是需要和外部服务协调执行。

分支/合并框架用一种被称为工作窃取的技术来解决这个问题。在实际应用中,这意味着这些任务差不多被平均分配到ForkJoinPool中的所有线程上。每个线程都为分配给它的任务保存一个双向链式队列,每完成一个任务,就会从队列头上取出下一个任务开始执行。基于前面所述的原因,某个线程可能早早完成了分配给它的所有任务,也就是他的队列已经空了,而其他的线程还很忙。这是,这个线程并没有闲下来,而是随机选了一个别的线程,从队列的尾巴上“偷走”一个任务。这个过程一直持续下去,直到所有的任务都执行完毕,所有的队列都清空。这就是为什么要划成许多小人物而不是少数几个大任务,这有助于更好的在工作线程之间平衡负载。

5.2.3. 最佳实践

虽然分支/合并框架还算简单易用,但是它也很容易被误用。以下是几个有效使用它的最佳做法:

  1. 对一个任务调用join方法会阻塞调用方,直到该任务做出结果。因此,有必要在两个子任务的计算都开始之后再调用它。否则,你得到的版本会比原始的顺序算法更慢更复杂,因为每个子任务都必须等待另一个子任务完成才能启动。

  2. 不应该在RecursiveTask内部使用ForkJoinPool的invoke方法。相反,你应该始终直接调用compute或fork方法,只有顺序代码才应该用invoke来启动并行计算。

  3. 对子任务调用fork方法会把它排进ForkJoinPool。同时对左边和右边的子任务调用它似乎很自然,但这样做的效率比直接对其中一个调用compute低。调用compute可以为其中一个子任务重用同一线程,从而避免在线程池中多分配一个任务造成的开销。

  4. 调试使用分支/合并框架的并行计算可能有点棘手。特别是你平常都喜欢在IDE里面看栈跟踪来找问题,但放在分支-合并计算上就不行了,因为调用compute的线程并不是概念上的调用方,而是调用fork的那个。

  5. 你不应该想当然的认为在多核处理器上使用分支/合并框架就比顺序计算快。只有在一个任务可以分解为多个独立的子任务,并且所有这些子任务的运行时间都比分出新任务所花的时间长时,才能让性能在并行化是有所提升。另外,分支/合并框架需要"预热"或者说要执行几遍才会被JIT编译器优化。同时还要知道,编译器内置的优化可能会为顺序版本带来一些优势(例如执行死码分析:删去从未被使用的计算)

5.3. 并行流

流(Stream)是Jdk1.8中引入的新接口,它可以让你编写功能强大的代码,用声明性的方式处理数据集。你可以通过对收集源调用parallelStream方法来把集合转换为并行流。并行流就是一个把内容分成多个数据块,并用不同的线程分别处理每个数据块的流。这样一来,你就可以自动把给定的操作的工作负荷分配给多核处理器的所有内核,让它们都忙起来。并行流可以使你无需显式的处理线程和同步问题。

还是让我们以之前的例子为例,假设你需要写一个方法,接收数字n为参数,并返回从1到给定参数的所有数字的和。来看一下并行流如何处理这个问题。

并行流归纳
public static long parallelSum(long n) {
	return Stream.iterate(1L, i -> i + 1)
			.limit(n)
			.parallel() (1)
			.reduce(0L, Long::sum);
}
1 将流转换为并行流

在上面的代码中,对流中所有数字求和的归纳过程的执行方式和之前介绍的 分支/合并框架 类似。实际上,并行流内部使用了默认的ForkJoinPool.

你可以通过 System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "12") 来改变ForkJoinPool线程池的大小,但是这是一个全局设置,它将影响代码中所有的并行流,目前还无法专为某个并行流指定线程数。一般而言,让ForkJoinPool的大小等于处理器数量是一个不错的默认值,除非你有很好的理由,否则强烈建议你 不要 修改它。

5.3.1. 测量流性能

我们声称并行求和方法应该比顺序求和方法性能好。然而在软件工程上,靠猜绝对不是什么好方法。特别是在性能优化时,你应该始终遵循三个黄金规则:测量,测量,再测量。下面我们会用3种方式实现这个数字求和的方法,并且比较它们的性能,然后来解释为什么会出现这种情况。

顺序流归纳
public static long sequenceSum(long n) {
	return Stream.iterate(1L, i -> i + 1)
			.limit(n)
			.reduce(0L, Long::sum);
}
for循环迭代
public static long iterativeSum(long n) {
	long result = 0;
	for (int i = 1; i <= n; i++) {
		result += i;
	}
	return result;
}
性能测试方法
public static long test(Function<Long, Long> computer, long n) { (1)
	long fastest = Long.MAX_VALUE;
	for (int i = 0; i < 10; i++) {
		long start = System.currentTimeMillis();
		computer.apply(n);
		long duration = System.currentTimeMillis() - start;
		if (duration < fastest) {
			fastest = duration;
		}
	}
	return fastest;
}

public static void main(String[] args) {
	long n = 20_000_000;
	System.out.println("顺序归纳耗时:"+test(SumComputer::sequenceSum, n));
	System.out.println("for循环归纳耗时:"+test(SumComputer::iterativeSum, n));
	System.out.println("并行归纳耗时:"+test(SumComputer::parallelSum, n));
}
1 执行传入的计算策略,反复运行10次,取最短耗时返回。

以上代码在运行时的结果如下:

顺序归纳耗时:212
for循环归纳耗时:14
并行归纳耗时:395

这相当令人失望,求和方法的并行版本比顺序版本要慢很多,比for循环的版本甚至慢了一个数量级,为什么会这样呢?这里实际上有两个问题。

  1. Stream的iterate方法生成的是装箱的对象,必须拆箱成数字才能求和。

  2. 我们很难把iterate分成多个独立块来并行执行。

第二个问题更有意思一点,因为你必须意识到某些流操作比其他操作更容易并行化。具体来说,iterate很难分割成能够独立执行的小块,因为每次应用这个函数都要依赖前一次应用的结果。这意味着,整个数字列表在归纳过程开始时没有准备好,因而无法有效地把流划分为小块来并行处理。这时把流标记为并行,你其实是给顺序处理增加了开销,因为它要把每次求和操作分到一个不同的线程上。

这就说明了并行流编程可能会很复杂,有时甚至有点违反直觉。如果用得不对(比如采用了一个不易并行化的操作,如iterate),它甚至可能让程序的整体性能更差,所以在调用那个看似神奇的parallel操作时,了解背后到底发生了什么是很有必要的。

使用更有针对性的方法

那到底要怎么利用多核处理器,用流来高效的并行求和呢?以我们的例子为例,让我们使用 LongStream的rangeClosed方法来生成要被处理的流,这个方法与iterate相比有两个优点。

  1. 直接产生原始类型的long数字,没有装箱拆箱的开销。

  2. 直接生成所有的数字,很容易拆分成独立的小块

让我们把顺序归纳和并行归纳方法修改一下,然后重新运行测试方法,看一看最终的结果。

改版后的方法
public static long parallelSum(long n) {
	return LongStream.rangeClosed(0, n).parallel().reduce(0L, Long::sum);
}

public static long sequenceSum(long n) {
	return LongStream.rangeClosed(0, n).reduce(0L, Long::sum);
}

运行后的结果

顺序归纳耗时:10
for循环归纳耗时:14
并行归纳耗时:2

终于,我们得到了一个比顺序执行快的多的并行归纳。这也表明,使用正确的数据结构然后使其并行工作能够保证最佳的性能。

尽管如此,请记住,并行化并不是没有代价的。并行化过程本身需要对流做递归划分,把每个子流的归纳操作分配到不同的线程,然后把这些操作的结果合并成一个值。但在多个内核之间移动数据的代价也可能比你想的要大,所以很重要的一点是要保证在内核中并行执行工作的时间比在内核间传递数据的时间长。总而言之,很多情况下不可能或不方便并行化。然而,在使用并行流加速代码之前,你必须确保用得对,如果结果错了,算得快就毫无意义了。让我们来看一个常见的陷阱。

5.3.2. 正确使用并行流

错用并行流而产生错误的首要原因,就是使用的算法改变了某些共享状态。下面是另一种实现求和的方法,但这回改变一个共享累加器:

错误的累加方法
public static long wrongSum(long n) {
	Accumulator accumulator = new Accumulator();
	LongStream.rangeClosed(0, n).parallel().forEach(accumulator::add);
	return accumulator.total;
}

public class Accumulator {

	public long total = 0;

	public void add(long value) {
		total += value;
	}

}
修改后的测试方法
public static long test(Function<Long, Long> computer, long n) {
		long fastest = Long.MAX_VALUE;
		for (int i = 0; i < 10; i++) {
			long start = System.currentTimeMillis();
			long sum = computer.apply(n);
			System.out.println("result is : "+ sum); (1)
			long duration = System.currentTimeMillis() - start;
			if (duration < fastest) {
				fastest = duration;
			}
		}
		return fastest;
	}

	public static void main(String[] args) {
		long n = 20_000_000;
		System.out.println("并行归纳耗时:" + test(SumComputer::wrongSum, n)); (2)
	}
1 把结果打印出来
2 测试错误的计算方法

运行结果:

result is : 33255658390054
result is : 23361526526524
result is : 28233192819211
result is : 24704892209221
result is : 17979642901590
result is : 16276812559576
result is : 32748310460098
result is : 29463940930891
result is : 24479655494703
result is : 29568875372455
并行归纳耗时:3

这回方法的性能无关紧要了,唯一要紧的是每次执行都会返回不同的结果,都离正确值差了十万八千里。这是由于多个线程在同时访问累加器,执行 total += value ,而这一句看似简单,却不是一个原子操作。问题的根源在于,forEach中调用的方法有副作用,他会改变多个线程共享对象的可变状态。要是你想用并行流又不想引发类似的意外,就必须避免这种情况。

5.3.3. 高效使用并行流

一般而言,想给出任何关于什么时候该用并行流的定量建议都是不可能也毫无意义的,因为任何类似于“仅当至少由一千个(或一百万或随便什么数字)元素的时候才用并行流”的建议对于某台特定机器上的某个特定操作可能是对的,但在略有差异的另一种情况下可能就是大错特错。尽管如此,我们至少可以提出一些定性意见,帮你决定某个特定情况下是否有必要使用并行流。

  1. 如果有疑问,测量。把顺序流转成并行流轻而易举,但却不一定是好事。我们在本节中已经指出,并行流并不总是比顺序流块。此外,并行流有时候会和你的直觉不一致,所以在考虑选择顺序流还是并行流时,第一个也是最重要的建议就是用适当的测量来检查其性能。

  2. 留意装箱。自动装箱和拆箱操作会大大降低性能。Jdk8中有原始类型流(IntStream,LongStream,DoubleStream)来避免这种操作,但凡有可能都应该用这些流。

  3. 有些操作本身在并行流上的性能就比顺序流差。特别是limit和findFirst等依赖于元素顺序的操作,它们在并行流上执行的代价非常大。例如:findAny会比findFirst性能好,因为它不一定要按顺序来执行。你总是可以调用unordered方法来把有序流变成无序流。那么,如果你需要流中的n个元素而不是专门要前n个的话,对无序并行流调用limit方法可能会比单个有序流(比如数据源是一个List)更高效。

  4. 要考虑流的操作流水线的总计成本。设N是要处理的元素的总数,Q是一个元素通过流水线的大致处理成本,则N*Q就是流操作的总成本。Q值较高就意味着使用并行流时性能好的可能性比较大

  5. 对于较小的数据量,选择并行流几乎从来都不是一个好的决定。并行处理少数几个元素的好处还抵不上并行化造成的额外开销。

  6. 要考虑流背后的数据结构是否易于分解。例如,ArrayList的拆分效率比LinkedList高得多,因为前者用不着遍历就可以平均拆分,而后者则必须遍历。另外,用range工厂方法创建的原始类型流也可以快速分解。最后,你可以自己实现Spliterator来完全掌握分解过程。

  7. 流自身的特点,以及流水线中的中间操作修改流的方式,都可能会改变分解过程的性能。例如一个有限的流可以分成大小相等的两部分,这样每个部分都可以比较高效的并行处理,但筛选操作可能丢弃的元素个数却无法预测,导致流本身的大小未知。

  8. 还要考虑终端操作中合并步骤的代价是大是小。如果这一步代价很大,那么组合每个子流产生的部分结果所付出的代价就可能会超出通过并行流得到的性能提升。

  9. 最后,并行流背后的基础架构是jdk7中的分支/合并框架框架。如果要正确使用并行流,一定要了解它的内部原理。

5.4. CompletableFuture

本章中的演示是在8核cpu上进行的,你需要根据你的cpu的核数增减商店的数量,才能看到相同的效果

通过前面Future的介绍,我们看到Future接口可以为异步计算建模,等待异步操作结果,以及获取计算的结果。但是这些特性还不足以让你编写简洁的并发代码。比如,我们很难表述Future结果之间的依赖性;从文字描述上这很简单,“当长时间计算任务完成时,请将该计算的结果通知到另一个长时间运行的计算任务,这两个计算任务都完成后,将计算的结果与以另一个查询操作的结果合并”。但是,使用Future中提供的方法完成这样的操作又是另外一回事。这也是我们需要更具描述能力的特性的原因,比如下面这些:

  1. 将两个异步计算合并为一个。这两个异步计算之间相互独立,同时第二个又依赖于第一个的结果。

  2. 等待Future集合中的所有任务都完成。

  3. 仅等待Future集合中最快结束的任务完成(有可能因为他们试图通过不同的方式计算同一个值),并返回它的结果。

  4. 通过编程方式完成一个Future任务的执行,即以手工设定异步操作结果的方式。

  5. 应对Future的完成事件。即当Future的完成事件发生时会收到通知,并能使用Future计算的结果进行下一步的操作,不只是简单地阻塞等待操作的结果。

在Jdb8中,引入了新的 CompletableFuture 类(它实现了Future接口),这个类提供的新特性会以更直观的方式将上述需求都变为可能。 CompletableFutureStream 的设计类似,它们都使用了Lambda表达式以及流水线的思想。

为了展示 CompletableFuture 的特性,我们会创建一个名为“最佳价格查询器”的应用,它会查询多个在线商店,依据给定的产品或服务找出最低的价格。在这个过程中,你会学到几个重要的技能。

  1. 首先,你会学到如何为你的客户提供异步API

  2. 其次,你会掌握如何让你使用了同步API的代码变为非阻塞代码。你会了解如何使用流水线将两个接续的异步操作合并为一个异步计算操作。这种情况肯定会出现,比如,在线商店返回了你想要购买商品的原始价格,并附带着一个折扣代码,最终,要计算出该商品的实际价格,你不得不访问第二个远程折扣服务,查询该折扣代码对应的折扣比率。

  3. 你还会学到如何以响应式的方式处理异步操作的完成事件,以及随着各个商店返回它的商品价格,最佳价格查询器如何持续地更新每种商品的最佳推荐,而不是等待所有的商店都返回他们各自的价格。(一旦某家商店的服务中断,程序可能会阻塞很长时间)

使用CompletableFuture一般用于异步聚合多个操作的结果,如果你在用CompletableFuture,但是并不关心被异步化的API的返回结果,这可能暗示着你应该用消息通讯。

5.4.1. 基本使用

Shop.java
public class Shop {

	//省略掉一些代码,只看关键部分

	public static void delay() { (1)
		try {
			Thread.sleep(1000L);
		} catch (Exception e) {
			throw new RuntimeException(e);
		}
	}

	private double calculatePrice(String product) { (2)
		delay();
		return random.nextDouble() * 100;
	}

	public Future<Double> getPriceAsync(String product) {
		CompletableFuture<Double> futurePrice = new CompletableFuture<>(); (3)
		new Thread(() -> futurePrice.complete(calculatePrice(product))).start(); (4)
		return futurePrice; (5)
	}

	public static void main(String[] args) throws Exception {
		Shop shop = new Shop("BestShop");
		long start = System.currentTimeMillis();
		Future<Double> futurePrice = shop.getPriceAsync("some product"); (6)
		System.out.println("调用返回,耗时:"+ (System.currentTimeMillis() - start));
		double price = futurePrice.get(); (7)
		System.out.println("价格返回,耗时"+ (System.currentTimeMillis() - start));
	}
}
1 模拟远程调用,延迟1秒
2 计算价格,延迟1秒后返回一个0到100的随机数
3 创建 CompletableFuture 对象,它会包含计算的结果
4 在另一个线程中异步执行计算,并将结果设入 CompletableFuture 对象
5 无需等待还没结束的计算,直接返回 CompletableFuture 对象
6 调用异步查询方法
7 Future 对象中读取价格,如果价格未知,会发生阻塞

执行结果为

调用返回,耗时:60
价格返回,耗时1062

我们看到, getPriceAsync 方法的调用返回的时间远远早于最终价格计算完成的时间,调用者可以利用这段时间并行的去执行其它操作。

getPriceAsync 方法中我们看到了如何通过编程创建 CompletableFuture 对象以及如何获取返回值,虽然看起来这些操作已经比较方便,但还有进一步提升的空间, CompletableFuture 类自身提供了大量精巧的工厂方法,使用这些方法能更容易地完成整个流程,还不用担心实现的细节。比如,采用 supplyAsync 方法后,你可以用一行语句重写 getPriceAsync 方法

public Future<Double> getPriceAsync(String product) {
	return CompletableFuture.supplyAsync(() -> calculatePrice(product));
}

supplyAsync 方法接受一个生产者( Supplier )作为参数,返回一个 CompletableFuture 对象,生产者方法会交由 ForkJoinPool 池中的某个执行线程运行,但是你也可以使用 supplyAsync 方法的重载版本,传递第二个参数指定不同的线程池,后面我们会介绍如何使用自定义的线程池来改善程序的性能。

你应该总是使用 CompletableFuture.supplyAsync 方式来建立 CompletableFuture,因为 supplyAsync 方法内部会对生产者方法产生的异常进行处理,避免你的线程因为异常而永久的阻塞。

5.4.2. 与流共舞

现在我们假设 Shop 类是由别人编写的,你不能修改其代码,并且只提供了同步阻塞式的价格查询服务 getPrice

public double getPrice(String product) {
	return calculatePrice(product);
}

然后我们拥有一个如下所示的商家列表:

private List<Shop> shops = Arrays.asList(
			new Shop("shop1"),
			new Shop("shop2"),
			new Shop("shop3"),
			new Shop("shop4"),
			new Shop("shop5"),
			new Shop("shop6"),
			new Shop("shop7"),
			new Shop("shop8"));

而你需要实现下面这样一个方法,它接受产品名称作为参数,返回一个字符串列表,字符串中包含商店的名称和该商店中指定商品的价格。

public List<String> findPrices(String product)

第一个实现方案可能是这样的:

public List<String> findPrices(String product) {
	return shops.stream()
			.map(shop -> String.format("s% price is %.2f", shop.getName(), shop.getPrice(product)))
			.collect(Collectors.toList());
}

好吧,这段代码看起来非常直白,让我们看一下它的执行时间,通过这些数据,我们可以比较优化后的性能提升。

public static void main(String[] args) {
	PriceService priceService = new PriceService();
	long start = System.currentTimeMillis();
	System.out.println(priceService.findPrices("iPhone7"));
	System.out.println("服务耗时:" + ( System.currentTimeMillis() - start ));
}
顺序流执行结果:
[shop1 price is 55.87, shop2 price is 97.27, shop3 price is 67.66, shop4 price is 33.26, shop5 price is 75.97, shop6 price is 6.33, shop7 price is 55.41, shop8 price is 27.31]
服务耗时:8122

方法的执行时间比8秒多了几毫秒,这是因为对这8个商店的查询时顺序进行的,每个操作都会花费大约1秒左右的时间计算请求商品的价格。下面让我们来改进下。如果你看了上面并行流的介绍,估计你会马上想到的第一个方法就是用并行流来避免顺序计算,比如这样:

public List<String> findPrices(String product) {
	return shops.parallelStream() (1)
			.map(shop -> String.format("s% price is %.2f", shop.getName(), shop.getPrice(product)))
			.collect(Collectors.toList());
}
1 将顺序流改为并行流
并行流执行结果:
[shop1 price is 59.49, shop2 price is 36.49, shop3 price is 98.52, shop4 price is 97.15, shop5 price is 95.31, shop6 price is 44.09, shop7 price is 79.06, shop8 price is 46.93]
服务耗时:1096

非常好,看起来这是个简单但有效的主意,下面让我们尝试用 CompletableFuture 来将对不同商店的同步调用替换为异步调用。

public List<String> findPrices(String product) {

	List<CompletableFuture<String>> priceFuture = shops.stream()
			.map(shop -> CompletableFuture
					.supplyAsync(() -> String.format("%s price is %.2f", shop.getName(), shop.getPrice(product)))) (1)
			.collect(Collectors.toList());

	return priceFuture.stream().map(CompletableFuture::join) (2)
					.collect(Collectors.toList());

}
1 使用 CompletableFuture 以异步方式计算每种商品的价格
2 等待所有异步操作结束

这段代码中有一些需要特别注意的地方。首先,第二个流中对所有future对象执行join操作,一个接一个地等待他们运行结束。注意join方法和Future接口中的get有相同的含义,唯一的不同是join不会抛出任何受检查异常。使用它你不再需要使用try/catch语句来捕获get抛出的异常。

另外,这里使用了两个不同的Stream流水线,而不是在同一个处理流的流水线上一个接一个地放置两个map操作,这其实是有缘由的。如果你在一个流水线中处理流,那么,每一个 CompletableFuture 只能在前一个操作结束(join)之后才能执行自己的查询操作,实际上还是顺序执行的。后面我们会学习如何用更简单的方式解决这个问题,现在先来看一下这个实现的执行结果:

CompletableFuture执行结果
[shop1 price is 66.04, shop2 price is 18.37, shop3 price is 59.30, shop4 price is 8.32, shop5 price is 28.74, shop6 price is 37.62, shop7 price is 86.39, shop8 price is 38.87]
服务耗时:2101

这个结果让人相当失望,它虽然比顺序执行的阻塞版本快,但是它的用时是使用并行流的版本的两倍。尤其考虑到,我们从顺序流改到并行流只做了非常小的改动,而从流改到 CompletableFuture 的代码则要复杂的多,这个结果就更让人沮丧。但这就是全部的真相么?让我们继续前进,看看我们漏掉了什么重要的东西。

首先让我们把店铺的数量增加到9个(cpu核心数 + 1),结果如下:

顺序流版本
[shop1 price is 87.89, shop2 price is 36.51, shop3 price is 82.72, shop4 price is 69.09, shop5 price is 4.34, shop6 price is 23.98, shop7 price is 57.36, shop8 price is 12.24, shop9 price is 62.96]
服务耗时:9131
并行流版本
[shop1 price is 5.36, shop2 price is 36.72, shop3 price is 97.03, shop4 price is 94.33, shop5 price is 68.00, shop6 price is 5.25, shop7 price is 95.47, shop8 price is 87.43, shop9 price is 54.61]
服务耗时:2103
CompletableFuture版本
[shop1 price is 61.00, shop2 price is 35.18, shop3 price is 42.30, shop4 price is 41.78, shop5 price is 30.40, shop6 price is 85.33, shop7 price is 47.67, shop8 price is 76.25, shop9 price is 33.26]
服务耗时:2102

这次并行流版本的程序比之前多消耗了1秒左右的时间,这因为并行流背后依赖ForkJoinPool,它的默认最大线程数是cpu核数,所以,前8个商店的查询占用了全部8个线程,第九个商店只能等到前面某一个商店的查询完成释放出空闲线程才能继续。

然后让我们把店铺的数量增加到17个(cpu核心数 * 2 + 1),结果如下:

顺序流版本
[shop1 price is 61.57, shop2 price is 91.58, shop3 price is 58.98, shop4 price is 39.04, shop5 price is 64.09, shop6 price is 27.32, shop7 price is 57.81, shop8 price is 1.31, shop9 price is 84.04, shop10 price is 48.88, shop11 price is 69.18, shop12 price is 12.32, shop13 price is 6.16, shop14 price is 19.90, shop15 price is 65.88, shop16 price is 93.24, shop17 price is 87.87]
服务耗时:17150
并行流版本
[shop1 price is 29.90, shop2 price is 5.93, shop3 price is 95.08, shop4 price is 66.59, shop5 price is 39.69, shop6 price is 1.30, shop7 price is 43.02, shop8 price is 29.97, shop9 price is 55.02, shop10 price is 59.06, shop11 price is 2.18, shop12 price is 92.35, shop13 price is 17.54, shop14 price is 60.41, shop15 price is 51.93, shop16 price is 42.96, shop17 price is 17.97]
服务耗时:3106
CompletableFuture版本
[shop1 price is 75.68, shop2 price is 8.09, shop3 price is 15.17, shop4 price is 50.71, shop5 price is 79.93, shop6 price is 60.15, shop7 price is 57.31, shop8 price is 73.88, shop9 price is 83.04, shop10 price is 58.33, shop11 price is 60.96, shop12 price is 35.27, shop13 price is 87.92, shop14 price is 39.18, shop15 price is 52.01, shop16 price is 91.07, shop17 price is 82.01]
服务耗时:3100

我们看到,随着并行流版本使用时间的增加, CompletableFuture 版本所用时间也在增加,这是因为它们内部采用的是同样的线程池。然而, CompletableFuture 具有一定的优势,因为它允许你对执行器(Excutor)进行配置,尤其是线程池的大小,让它以更适合应用需求的方式进行配置,满足程序的要求,而这是并行流API无法提供的。让我们看看怎样利用这种配置上的灵活性带来实际应用性能上的提升。

Executor executor = Executors.newFixedThreadPool(Math.min(shops.size(), 100)); (1)

List<CompletableFuture<String>> priceFuture = shops.stream()
		.map(shop -> CompletableFuture.supplyAsync(
				() -> String.format("%s price is %.2f", shop.getName(), shop.getPrice(product)), executor)) (2)
		.collect(Collectors.toList());

return priceFuture.stream().map(CompletableFuture::join).collect(Collectors.toList());
1 声明一个线程数等于商店数量的执行器,最大线程数100
2 将这个执行器作为 CompletableFuture.supplyAsync 方法的第二个参数

改进以后,17个商店的运行结果:

[shop1 price is 44.29, shop2 price is 2.80, shop3 price is 46.62, shop4 price is 9.91, shop5 price is 65.43, shop6 price is 30.60, shop7 price is 81.37, shop8 price is 30.10, shop9 price is 32.39, shop10 price is 13.35, shop11 price is 74.37, shop12 price is 80.42, shop13 price is 19.05, shop14 price is 1.27, shop15 price is 94.85, shop16 price is 23.90, shop17 price is 94.46]
服务耗时:1101

一般而言,这种耗时1秒的状态会一直持续,直到商店数目超过执行器的最大线程数100. 而在这个过程中,每增加8个商店,并行流的耗时就会增加1秒。这个例子证明了要创建更适合你的应用特性的执行器,利用 CompletableFuture.supplyAsync 向其提交任务执行是个不错的主意。处理需要大量使用异步操作的情况时,这几乎是最有效的策略。

调整线程池的大小

线程池的大小非常重要,如果线程池中线程数量过多,最终他们会竞争稀缺的处理器和内存资源,浪费大量的时间在上下文切换上。反之,如果线程的数量过少,处理器的一些核可能就无法充分利用。线程池大小与处理器的利用率之比可以使用下面的公式进行估算:

T = N * U * (1 + W/C)

其中:

  • N是处理器的核的数量,可以通过Runtime.getRuntime().availableProcessors()得到

  • U是期望的CPU利用率(该值应该介于0-1之间)

  • W/C是等待时间与计算时间的比率

拿我们的例子举例,我的机器有8个核,则N是8,我们的getPrice方法单独执行时耗时1010毫秒,其中1000毫秒是等待时间,那么等待时间和计算时间的比是 1000/10 = 100, 那么,如果我希望CPU利用率 70% 的话,线程数应该是 8 * 0.7 * 100 = 560

而在实际操作中,如果你创建的线程数比商店的数目更多,反而是一种浪费。因为多出的线程根本没机会用到,所以,最佳的策略应该是创建的线程数与商店数目相等,最大不超过560.

使用并行流还是CompletableFuture?

目前为止,你已经知道对集合进行并行计算有两种方式:要么将其转化为并行流,利用map这样的操作开展工作,要么枚举出集合中的每一个元素,创建新的线程,在 CompletableFuture 内对其进行操作。后者提供了更多的灵活性,你可以调整线程池的大小,而这能帮助你确保整体的计算不会因为线程都在等待I/O而发生阻塞。对这些API的使用建议如下:

  1. 如果你进行的是计算密集型的操作,并没有I/O,那么推荐使用Stream接口,因为实现简单,同时效率也可能是最高的。如果所有的线程都是计算密集型的,那就没有必要创建比处理器核数更多的线程。

  2. 反之,如果你并行的工作单元还涉及等待I/O的操作(如网络连接,数据库访问等),那么使用 CompletableFuture 灵活性更好,你可以像前文讨论的那样,依据等待/计算的比率设定需要使用的线程数。这种情况不使用并行流的另一个原因是,处理流的流水线中如果发生I/O等待,流的延迟特性会让我们很难判断到底什么时候触发了等待。

现在你已经了解了如何利用 CompletableFuture 提供异步API,以及如何将一个同步又缓慢的服务转换为异步的服务。不过到目前为止,我们每个Future中运行的都是单次的操作。下一节中,你会看到如何将多个异步操作结合在一起细,以流水线的方式运行。

5.4.3. 连接多个异步任务

让我们假设我们的商店都同意使用一个统一的集中式折扣服务。该服务提供五个不同的折扣代码,每个折扣代码对应不同的折扣率。然后所有的商店现在返回价格时还会返回一个折扣代码,现在getPrice返回的字符串的格式是ShopName:Price:DiscountCode。

Discount.java
public enum Discount {

	NONE(0), SILVER(5), GOLD(10), PLATINUM(15), DIAMOND(20);

	private final int percentage;

	Discount(int percentage) {
		this.percentage = percentage;
	}
}
Shop.java
public String getPrice(String product) {
	double price = calculatePrice(product);
	Discount discount = Discount.values()[random.nextInt(Discount.values().length)];
	return String.format("%s:%.2f:%s", name, price, discount);
}

封装getPrice返回的字符串

Quote.java
public class Quote {

	private final String shop;
	private final double price;
	private final Discount discount;

	public Quote(String shop, double price, Discount discount) {
		this.shop = shop;
		this.price = price;
		this.discount = discount;
	}

	public static Quote parse(String content) {
		String[] items = StringUtils.splitByWholeSeparatorPreserveAllTokens(content, ":");
		return new Quote(items[0], Double.parseDouble(items[1]), Discount.valueOf(items[2]));
	}
}

折扣服务

DiscountService.java
public class DiscountService {

	public static String applyDiscount(Quote quote) {
		return quote.getShop() + " price is " + apply(quote.getPrice(), quote.getDiscount());
	}

	private static String apply(double price, Discount discount) {
		delay(); (1)
		return NumberFormat.getInstance().format(price * (100 - discount.getPercentage()) / 100);
	}
}
1 折扣服务也有1秒的延时。

现在折扣服务已经准备好了,我们首先用顺序流的方式来实现我们的需求,先获取商品价格和折扣,然后计算折扣以后的价格

public List<String> findPrices(String product) {

	return shops.stream()
			.map(shop -> shop.getPrice(product)) (1)
			.map(Quote::parse) (2)
			.map(DiscountService::applyDiscount) (3)
			.collect(Collectors.toList());
}
1 取得每个shop中商品的原始价格和折扣码
2 在Quote对象中对shop返回的字符串进行转换
3 调用discount服务,为每个Quote申请折扣

你可能能猜到,这种方式的效率是最低的

[shop1 price is 86.823, shop2 price is 20.29, shop3 price is 83.511, shop4 price is 12.807, shop5 price is 7.92, shop6 price is 23.868, shop7 price is 12.502, shop8 price is 30.78, shop9 price is 61.664]
服务耗时:18161

9个商店一共用了18秒,因为每个商店获取价格并打折都会花两秒。我们不再演示并行流的效果,因为这种方式扩展性不好,依赖的是底层的线程数量固定的通用线程池。让我们直接进入 CompletableFuture 实现。

public List<String> findPrices(String product) {

		Executor executor = Executors.newFixedThreadPool(Math.min(shops.size(), 100));

		List<CompletableFuture<String>> priceFuture = shops.stream()
				.map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice(product), executor)) (1)
				.map(future -> future.thenApply(Quote::parse)) (2)
				.map(future -> future.thenCompose(
						quote -> CompletableFuture.supplyAsync(() -> DiscountService.applyDiscount(quote), executor))) (3)
				.collect(Collectors.toList();

		return priceFuture.stream().map(CompletableFuture::join).collect(Collectors.toList()); (4)
}
1 异步方式获取价格
2 前一个Future返回后,对其返回的值进行转换
3 使用另一个异步任务构造期望的Future,申请折扣
4 等待流中所有的Future执行完毕,并提取各自的返回值
async

执行结果:

[shop1 price is 66.462, shop2 price is 80.406, shop3 price is 50.682, shop4 price is 36.195, shop5 price is 31.875, shop6 price is 80.3, shop7 price is 49.23, shop8 price is 25.33, shop9 price is 16.008]
服务耗时:2119
thenApply和thenCompose都接收一个Function实现。在当前CompletableFuture执行完以后,会将结果传给这个Function进行处理。thenApply的Function可以返回任何值,一般用于同步处理。thenCompose的Function必须返回一个CompletableFuture,一般用于连接两个异步处理。这两个方法,还有CompletableFuture中的一些方法,都有一个以Async后缀结尾的版本。通常而言,名称不带Async的方法和他的前一个任务在同一个线程中运行,而名称以Async结尾的方法会将后续的任务提交到一个线程池,所以后续任务是由不同的线程处理的。

5.4.4. 并行多个异步任务

在前面的例子中,我们使用thenCompose将两个异步任务连接起来,第一个任务的返回作为第二个任务的输入。另一种比较常见的情况是,你需要将两个完全不相干的 CompletableFuture 对象的结果整合起来,而且你也不希望等到第一个任务完全结束才开始第二项任务。

这种情况,你应该使用thenCombine方法,它接收名为BiFunction的第二个参数,这个参数定义了当两个 CompletableFuture 对象完成计算后,结果如何合并。同thenCompose方法一样,thenCombine方法也提供了一个Async的版本。如果使用thenCombineAsync会导致BiFunction中定义的合并操作被提交到线程池中,由另一个任务以异步的方式执行。

回到我们的例子中来,假设我们的商店返回的价格都是欧元,但是你希望用人民币的方式提供给你的客户。你可以在查询商品价格的同时,向一个远程的汇率服务查询欧元和人民币之间的汇率,当这两者都结束时,再将这两个结果结合起来,得到以人民币计价的商品价格。代码如下:

ExchangeService.java
public class ExchangeService {

	public static double getRate(String source, String target) { (1)
		delay();
		return 10;
	}

	public static void delay() {
		try {
			Thread.sleep(1000L);
		} catch (Exception e) {
			throw new RuntimeException(e);
		}
	}

}
1 汇率服务,延迟1秒,永远返回10
Shop.java
public Quote getPrice(String product) { (1)
	double price = calculatePrice(product);
	Discount discount = Discount.values()[random.nextInt(Discount.values().length)];
	return Quote.parse(String.format("%s:%.2f:%s", name, price, discount)); (2)
}
1 修改getPrice方法返回值
2 直接返回Quote对象
PriceServce.java
public List<String> findPrices(String product) {

	Executor executor = Executors.newFixedThreadPool(100); (1)

	List<CompletableFuture<String>> priceFuture = shops.stream()
			.map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice(product), executor).thenCombine( (2)
					CompletableFuture.supplyAsync(() -> ExchangeService.getRate("USD", "CNY"), executor), (3)
					(quote, rate) -> new Quote(quote.getShop(), quote.getPrice() * rate, quote.getDiscount()))) (4)
			.map(future -> future.thenCompose(
					quote -> CompletableFuture.supplyAsync(() -> DiscountService.applyDiscount(quote), executor)))
			.collect(Collectors.toList());

	return priceFuture.stream().map(CompletableFuture::join).collect(Collectors.toList());
}
1 不能再使用shop数量做线程数了,因为有独立的异步任务要同时执行,需要更多的线程。
2 第一个任务,查询商品价格
3 第二个任务,查询汇率
4 将商品价格乘以汇率得到人民币价格
async

在第一个map里,同时开了两个线程分别查询价格和汇率,当两个任务都完成后,用这两个异步任务的返回调用BiFunction,然后用BiFunction的结果异步调用第三个任务算折扣。这样,虽然9个商店都调用了3个耗时一秒的远程服务,但由于9个任务都是并行的,而且3个耗时1秒的远程服务中的2个又是并行的,所以,最终的耗时仅有2秒。

执行结果:

[shop1 price is 791.265, shop2 price is 258.39, shop3 price is 557.84, shop4 price is 882.36, shop5 price is 342.475, shop6 price is 112.77, shop7 price is 202.72, shop8 price is 693.04, shop9 price is 357.36]
服务耗时:2105

通过前面两个例子,非常清晰地呈现了相对于采用java 8之前提供的Future实现, CompletableFuture 版本实现所具备的巨大优势。 CompletableFuture 利用lambda表达式以声明式的API提供了一种机制,能够用最有效的方式,非常容易的将多个以同步或异步方式执行复杂操作的任务结合到一起。随着处理任务和需要合并结果数目的增加,这种声明式程序设计的优势会愈发明显。

5.4.5. 响应completion事件

本章你看到的所有示例代码都是通过响应之前添加1秒钟的等待延迟模拟方法的远程调用。而在现实世界中,你的应用访问各个远程服务时很可能遭遇无法预知的延迟,触发的原因多种多样,从服务器的负荷到网络的延迟。目前为止,我们实现的findPrices方法只有在取得所有商店的返回值时才显示商品的价格。而我们希望的效果是,只要有商店返回商品价格就在第一时间显示返回值,不再等待哪些还未返回的商店(有些甚至会发生超时)。如何实现这种更进一步的改进需求呢?

你要避免的首要问题是,等待创建一个包含了所有价格的List创建完成。你应该做的是直接处理 CompletableFuture 流,这样每个 CompletableFuture 都在为某个商店执行必要的操作。我们来看下代码:

随机生成0.5秒到2.5秒的延迟
public static void delay() {
	int delay = 500 + random.nextInt(2000);
	try {
		Thread.sleep(delay);
	} catch (Exception e) {
		throw new RuntimeException(e);
	}
}
public void findPrices(String product) {

	long start = System.currentTimeMillis();

	Executor executor = Executors.newFixedThreadPool(100);

	CompletableFuture<?>[] futures = shops.stream()
			.map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice(product), executor).thenCombine(
					CompletableFuture.supplyAsync(() -> ExchangeService.getRate("USD", "CNY"), executor),
					(quote, rate) -> new Quote(quote.getShop(), quote.getPrice() * rate, quote.getDiscount())))
			.map(future -> future.thenCompose(
					quote -> CompletableFuture.supplyAsync(() -> DiscountService.applyDiscount(quote), executor)))
			.map(future -> future.thenAccept(content ->  (1)
					System.out.println(content + " (done in " + (System.currentTimeMillis() - start )+" msecs)")))
			.toArray(size -> new CompletableFuture[size]);

}
1 使用 thenAccept 在每个 CompletableFuture 上注册一个操作,在 CompletableFuture 完成后使用这个操作处理返回值。

由于 thenAccept 方法已经定义了如何处理 CompletableFuture 返回的结果,所以它返回一个 CompletableFuture<Void> 。所以,map操作返回的是一个 String<CompletableFuture<Void>> 。对这个 CompletableFuture<Void> 对象,你能做的事非常有限,只能等待其运行结束,不过一般这也就是你所期望的。

运行结果:

服务耗时:66
shop6 price is 58.96 (done in 1774 msecs)
shop8 price is 374.6 (done in 1924 msecs)
shop2 price is 418.32 (done in 2312 msecs)
shop5 price is 647.53 (done in 2648 msecs)
shop1 price is 588.37 (done in 2833 msecs)
shop9 price is 405.08 (done in 2947 msecs)
shop3 price is 528.03 (done in 3193 msecs)
shop7 price is 155.515 (done in 3340 msecs)
shop4 price is 75.42 (done in 3344 msecs)

我们看到,由于随机延迟的效果,最快的价格查询用时只有最慢的查询的一半。而我们在最快的查询完成时就立刻做出了处理,而不是等到所有查询全完成以后才处理。

另外,我们看到的服务耗时是66毫秒,这是因为主线程在把任务分配给执行线程以后就立即返回了。那么如果你需要在所有商店全都处理完毕以后做一些处理怎么办呢?

public void findPrices(String product) {

	long start = System.currentTimeMillis();

	Executor executor = Executors.newFixedThreadPool(100);

	CompletableFuture<?>[] futures = shops.stream()
			.map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice(product), executor).thenCombine(
					CompletableFuture.supplyAsync(() -> ExchangeService.getRate("USD", "CNY"), executor),
					(quote, rate) -> new Quote(quote.getShop(), quote.getPrice() * rate, quote.getDiscount())))
			.map(future -> future.thenCompose(
					quote -> CompletableFuture.supplyAsync(() -> DiscountService.applyDiscount(quote), executor)))
			.map(future -> future.thenAccept(content ->
					System.out.println(content + " (done in " + (System.currentTimeMillis() - start )+" msecs)")))
			.toArray(size -> new CompletableFuture[size]);

	CompletableFuture.allOf(futures).thenAccept((obj) -> System.out.println("all shop return results or time out")); (1)

}
1 所有 CompletableFuture 处理完毕后打印一句话。

allOf方法接收一个由 CompletableFuture 组成的数组,数组中的所有 CompletableFuture 对象执行完成之后,它返回一个 CompletableFuture<Void> 对象。你只需要对这个对象调用thenAccept方法就可以在所有 CompletableFuture 处理完毕后执行一些操作。

服务耗时:67
shop2 price is 439.04 (done in 1669 msecs)
shop4 price is 76 (done in 1825 msecs)
shop9 price is 654.03 (done in 2306 msecs)
shop7 price is 339.91 (done in 2356 msecs)
shop1 price is 593.28 (done in 2514 msecs)
shop6 price is 590.64 (done in 3034 msecs)
shop8 price is 391.52 (done in 3078 msecs)
shop5 price is 812.97 (done in 3170 msecs)
shop3 price is 836.095 (done in 3495 msecs)
all shop return results or time out

而在另一些场景中,你可能希望只要 CompletableFuture 数组中的任何一个执行完毕就不再等待,比如,你正在同时查询两个汇率服务器,任何一个返回了结果都能满足你的需求。在这种情况下,你可以使用一个类似的工厂方法anyOf。该方法接收一个由 CompletableFuture 组成的数组,返回由第一个执行完的 CompletableFuture 对象的返回值构成的 CompletableFuture<Object>

public void findPrices(String product) {

		long start = System.currentTimeMillis();

		Executor executor = Executors.newFixedThreadPool(100);

		CompletableFuture<?>[] futures = shops.stream()
				.map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice(product), executor).thenCombine(
						CompletableFuture.supplyAsync(() -> ExchangeService.getRate("USD", "CNY"), executor),
						(quote, rate) -> new Quote(quote.getShop(), quote.getPrice() * rate, quote.getDiscount())))
				.map(future -> future.thenCompose(
						quote -> CompletableFuture.supplyAsync(() -> DiscountService.applyDiscount(quote), executor)))
				.toArray(size -> new CompletableFuture[size]);

		CompletableFuture.anyOf(futures).thenAccept((content) -> System.out.println(
				"fastest shop : " + content + " (done in " + (System.currentTimeMillis() - start) + " msecs)")); (1)

}
1 第一个 CompletableFuture 处理完毕后打印一句话。

运行结果:

服务耗时:68
fastest shop : shop4 price is 379.185 (done in 1837 msecs)

5.5. Spring中的异步处理

从2.0开始,spring引入了一个新的Executors抽象。Executors是Jdk1.5引入的新概念,一般情况下指的是线程池。而使用Executor这个名字,则是因为底层的实现有时并不真正是一个池,一个Executor可能是单线程的,甚至可能是同步的。

Spring的 TaskExecutor 接口与jdk的 java.util.concurrent.Executor 接口是完全一样的。接口只有一个单一的方法 execute(Runnable task) ,接收一个任务,然后基于底层配置的线程池来执行。

TaskExecutor 一开始是为了给其它需要线程池的Spring组件提供一层抽象而创建的,比如 ApplicationEventMulticaster , JMS的 AbstractMessageListenerContainer .然而,如果你自己的bean也需要线程池行为,你也可以根据自己的需要使用这个抽象。

5.5.1. TaskExecutor的类型

在Spring里内建了多个 TaskExecutor 接口的实现,无论什么情况,你应该永远都不需要提供自己的实现。

  • SimpleAsyncTaskExecutor 这个实现不重用任何线程,而是为每次调用都启动一个新的线程。它支持并发数量限制,当达到限制时,任何新的调用将会被阻塞,直到有新的位置空闲出来。 注意,它并不是一个真正的池。

  • SyncTaskExecutor 这个实现不执行任何的异步调用,每个调用都会被在主线程中处理。主要用于单元测试之类不真正需要多线程的场景。

  • ConcurrentTaskExecutor 这个实现是 java.util.concurrent.Executor 的适配器,极少会用到。

  • SimpleThreadPoolTaskExecutor 这个实现实际上是Quartz的 SimpleThreadPool 的子类。当你需要一个能在Quartz组件和非Quartz组件间共享的线程池时,使用此类。

  • ThreadPoolTaskExecutor 这是最常用的一个实现。它把配置 java.util.concurrent.ThreadPoolExecutor 所需的属性暴露为bean的属性,并将其包装为一个 TaskExecutor

  • WorkManagerTaskExecutor 不要用。

5.5.2. 使用TaskExecutor

配置
@Configuration
public class Config {

	@Bean
	public TaskExecutor taskExecutor() {
		ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); (1)
		taskExecutor.setCorePoolSize(10); (2)
		taskExecutor.setMaxPoolSize(25); (3)
		taskExecutor.setQueueCapacity(10); (4)
		return taskExecutor;
	}

}
1 实例化ThreadPoolTaskExecutor
2 设置核心线程数
3 设置最大线程数
4 设置队列容量

当有新的任务添加到 ThreadPoolTaskExecutor 时,如果池中的线程数量未达到 corePoolSize,则立即创建新的线程来执行任务。如果已经到达corePoolSize,则放入队列,如果队列达到容量限制,则创建新的线程来执行任务,直到线程数达到maxPoolSize。这时再提交新的任务,任务会被拒绝,然后会使用rejectedExecutionHandler属性指定的Handler来处理这些被拒绝的任务,默认的策略是抛出RejectedExecutionException。

只有在队列满了以后才会创建多于corePoolSize数量的线程,而queueCapacity默认为Integer.MAX_VALUE,所以,如果你光指定maxPoolSize而不指定queueCapacity的话,是毫无意义的。

使用
@Component
public class TestServiceImpl implements TestService {

	@Autowired
	private TaskExecutor taskExecutor; (1)

	public void handle() {
		for (int i = 0; i < 100; i++) {
			int index = i;
			taskExecutor.execute(() -> System.out.println(index)); (2)
		}
	}
}
1 引入 TaskExecutor
2 TaskExecutor 添加任务, TaskExecutor 会决定何时以及如何执行任务。

5.5.3. @Async注解

Spring还提供了一些注解来支持任务的异步执行。为了使用@Async注解,你应该在你的配置类上添加@EnableAsync注解。

@Async注解可以添加在一个方法上,然后这个方法的调用将被异步执行。换句话说,调用有@Async注解的方法时,方法会立即返回,而方法的执行过程将被放在一个任务中提交给一个Spring的TaskExecutor.此注解可以用在无返回的方法上,也可以用在有返回值的方法上,但是,返回值只能是Future类型的,以便调用者使用get方法来获取异步执行的返回结果。

合法的异步注解
@Async
void doSomething(String s) {
    // this will be executed asynchronously
}

@Async
Future<String> returnSomething(int i) {
    // this will be executed asynchronously
}
除了标准的Future外,返回值也可以是Spring的 ListenableFuture 或者Jdk8的 CompletableFuture
@Async不能和@PostConstruct一起用

默认情况下,当一个@Async注解的方法被调用时,它会在spring中寻找一个Executor的实现并将方法的执行任务提交给它。如果spring容器中只有一个Executor实现,那么就直接用它,如果spring容器中有多个Executor实现,那么默认会使用名为taskExecutor的实现,如果所有实现bean的名字都不是taskExecutor,则抛出异常。当spring容器中有多个Executor实现时,你可以通过指定@Async的value属性来指定使用哪个Executor实现来异步执行方法。

@Async("otherExecutor") (1)
void doSomething(String s) {
    // this will be executed asynchronously by "otherExecutor"
}
1 使用名字为 otherExecutor 的 TaskExecutor 来异步处理 doSomething方法。

5.6. SpringMVC中的异步处理

SpringMVC的异步请求处理只能在Servlet3容器中生效。

从Spring MVC 3.2开始,Controller方法可以返回一个Callable,并且从一个被Spring容器管理的线程中异步的产生返回值。Servlet容器主线程会被释放,然后处理其它请求。Spring MVC 会在 TaskExecutor 中使用一个单独的线程处理 Callable,当Callable返回时,请求会被发送回Servlet容器并处理Callable的返回值。这个处理对前台来说是透明的,如果Callable处理需要1秒,那么前台在发出请求后仍然需要等待一秒,然后才能获取到结果,但是对后台Servlet容器来说,并不会占用一个线程1秒,一开始处理请求的线程,在这1秒内,可以处理很多其它的请求。这种能力,可以大大提高Servlet容器的吞吐量。

@PostMapping
public Callable<String> processUpload(final MultipartFile file) {

    return new Callable<String>() {
        public String call() throws Exception {
            // ...
            return "someView";
        }
    };

}

除了返回Callable外,Controller也可以返回一个 DeferedResult 的实例。这种情况下, 返回值将可以由任何线程产生,比如JMS消息,或者另一个Http请求。

@RequestMapping("/quotes")
public DeferredResult<String> quotes() {
    DeferredResult<String> deferredResult = new DeferredResult<String>();
    // 将deferredResult保存到某处
    return deferredResult;
}

// 在其它线程中
deferredResult.setResult(data);

6. 服务网关

我们现在已经有了几个微服务应用,而真实世界中的微服务系统,往往由几十上百个应用组成,这时我们会面对以下这些问题:

  1. 每个应用都会有一些相同的需求,比如身份认证,服务调用日志,流量控制等。为了避免每个应用重复开发这些功能,我们需要一个集中的地方统一处理这些需求。

  2. 外部应用调用微服务时需要一个统一的入口,因为服务数量众多,而且会随着业务而变化,我们需要对外屏蔽这些信息和变化,让外部系统针对一个统一的入口来开发。

服务网关就是为了解决这些问题而产生的。

6.1. 基于Spring Cloud Zuul搭建网关

创建server-zuul项目

pom.xml
<dependency>
	<groupId>org.springframework.cloud</groupId>
	<artifactId>spring-cloud-starter-eureka</artifactId>
</dependency>
<dependency>
	<groupId>org.springframework.cloud</groupId>
	<artifactId>spring-cloud-starter-zuul</artifactId>
</dependency>
ZuulServer.java
@SpringBootApplication
@EnableZuulProxy (1)
public class ZuulServer {

	public static void main(String[] args) {
		SpringApplication.run(ZuulServer.class, args);
	}

}
1 声明本应用是zuul网关应用
application.yml
spring:
  application:
    name: server-zuul
eureka:
  instance:
    preferIpAddress: true
  client:
    registerWithEureka: true
    fetchRegistry: true
    serviceUrl:
      defaultZone: http://127.0.0.1:8761/eureka/
server:
  port: 8062
endpoints:
  sensitive: false (1)
1 zuul的路由信息是通过actuator端点提供的,而actuator端点默认是需要身份认证的,这里是关闭actuator端点的身份认证

因为zuul需要从eureka获取服务信息,所以需要添加eureka服务器的相关配置。

6.2. 配置Zuul静态路由

默认情况下,zuul会根据eureka的信息自动生成路由配置,我们把zuul启动起来,然后访问 http://127.0.0.1:8062/routes

默认路由配置
Figure 17. 默认路由配置

我们可以看到,我们的微服务已经被自动映射到相应名字的路径上,比如我们现在就可以通过访问 http://127.0.0.1:8062/commodity-service/products/1 这个路径来访问商品应用的商品信息服务。

我们可以手工配置来控制路由规则。

application.yml
zuul:
  ignored-services: 'commodity-service' (1)
  prefix: /api (2)
  routes:
    commodity-service: '/commodity/**' (3)
1 commodity-service不自动生成路由配置,如果写'*',会忽略掉所有服务的自动路由配置
2 所有的服务请求路径以/api做前缀
3 /commodity/**的所有请求,转发给commodity-service服务。

这样配置后,我们要访问商品信息服务,需要访问路径 http://127.0.0.1:8062/api/commodity/products/1

6.3. 超时时间配置

默认情况下,zuul会使用hystrix和ribbon来做熔断和负载均衡。

hystrix的默认超时时间是1秒,所以zuul会打断所有处理时间超过1秒的请求并返回500

可以通过以下配置项配置hystrix的全局超时时间。

hystrix.command.default.execution.isolation.thread.timeoutInMilliseconds: 3000 (1)
1 设置hystrix默认超时时间为3秒

你也可以通过以下配置指定某个具体的服务应用的超时时间

hystrix.command.commodity-service.execution.isolation.thread.timeoutInMilliseconds: 2000 (1)
1 设置commodity-service应用的hystrix超时时间为2秒

ribbon的默认超时时间是5秒,所以如果你要设置的超时时间超过5秒,还要同时设置ribbon的超时时间

ribbon.ReadTimeout: 7000 (1)
commodity-service.ribbon.ReadTimeout: 6000 (2)
1 设置ribbon的全局超时时间为7秒
2 设置commodity-service应用的ribbon的超时时间为6秒

6.4. zuul真正的力量:Filter

使用Filter进行认证,动态路由,请求重写等等

7. 配置中心

前面的所有配置,在配置中心里都已经提供了标准的配置namespace,在你的项目中直接引用即可,不需要自己配置。

随着程序功能的日益复杂,程序的配置日益增多:各种功能的开关、参数的配置、服务器的地址…​

对程序配置的期望值也越来越高:配置修改后实时生效,灰度发布,分环境、分集群管理配置,完善的权限、审核机制……

当我们是一个单机服务时,我们的配置通常写在一个文件中的,代码发布的时候,把配置文件和程序打包推送到机器上去。

但是,一个微服务架构的系统,部署的节点往往是以百记的,这时,当一个配置发生变化时,我们可能需要重新编译、打包、部署几十台机器。这样去部署配置简直是一场噩梦。

在这样的环境下,传统的通过配置文件、数据库等方式已经越来越无法满足开发人员对配置管理的需求。配置中心应运而生.

Spring Cloud 自带的配置中心基于Git来管理配置,缺乏很多企业级场景所需的功能,而且没有经过大规模应用的实际生产环境考验。进过一些对比后,我们选了携程开源的apollo作为配置中心的技术实现。
本文档只描述如何接入apollo,apollo的功能和概念请参考apollo官方网站

7.1. 创建项目

登录http://apollo.zkh360.com/,用户名密码钉钉上找“翟亮”要。

登录
Figure 18. 登录
创建项目
Figure 19. 创建项目
  1. 应用id为6位数字技术部的项目001开头,信息部的项目002开头。后3位自己定。

项目创建好后就可以添加配置了,在添加以前,让我们回头看一下目前项目的配置文件。

7.2. 添加配置项

我们现在有3个项目,商品,订单和购物车。

在这些项目中,配置项可以分成2类。

  1. 所有项目都通用的配置项,主要是一些基础设施的配置。比如eureka服务器的地址,rabbitmq服务器的地址等。

  2. 项目特有的配置项,比如项目的名称,消息通讯时入栈渠道和出栈渠道的配置等。

第一种配置通过关联开发框架配置的Namespace来引入,不需要每个项目自己配置,Namespace的名字是info.app.atlas.framework

第二种配置直接添加在项目默认创建的Namespace(application)里即可。如果已有本地的properties文件,直接用文本方式把配置copy过去即可。

添加配置项
Figure 20. 添加配置项
文本模式
Figure 21. 文本模式

7.3. 应用接入

首先修改本地maven的settings,使其默认指向公司maven私服。

settings.xml
<servers>
	<server>
		<id>zkh-nexus</id>
		<username>admin</username>
		<password>zkh136@!#uy</password>
	</server>
</servers>
<mirrors>
	<mirror>
		<id>zkh-nexus</id>
		<name>zkh-nexus-server</name>
		<url>http://mvn.zkh360.com/nexus/content/groups/public/</url>
		<mirrorOf>*</mirrorOf>
	</mirror>
</mirrors>

在commodity-app项目的pom中加入开发框架依赖

pom.xml
<dependency>
	<groupId>com.zkh360.framework</groupId>
	<artifactId>zaf-core</artifactId>
	<version>1.3.0-RELEASE</version>
</dependency>

在项目的src/main/resources下新建META-INF文件夹,添加app.properties;

app.properties
app.id=001001 (1)
1 在apollo里创建项目时写的项目id.

删掉原来的配置文件,如application.properties。新建bootstrap.properties

bootstrap.properties
apollo.bootstrap.enabled = true (1)
apollo.bootstrap.namespaces = application,info.app.atlas.framework (2)
1 启用配置服务器
2 从配置服务器读入的Namespace

在commodity-app项目启动类的启动配置里加入虚拟机变量evn,值为dev

启动变量
Figure 22. 启动变量
环境信息不要写在代码里,而是在运行时由外部传入,这样可以保持所有环境下的部署物一致,减少部署工作量。

OK,现在启动commodity-app时,应用会从配置服务器读入配置。

8. 环境说明

到现在为止,在微服务环境下开发、发布并访问服务的所有组件都已经具备了。虽然这只是一个最基础的环境,还缺乏很多组件,比如认证授权,各种监控等等。但是我们已经可以进行开发、测试等工作了。

为了方便项目组的开发、测试等工作,避免每个项目组都要搭建自己的环境,我们提供了统一的开发、测试、生产环境。各个项目组只要把自己的服务开发好并发布上来即可。

目前在逻辑上有4套环境,DEV,FAT,UAT,PRO:

DEV环境用于开发人员本地运行服务,以DEV环境参数运行的服务,不会注册到注册中心,也就是说,其他服务开发者不会调用到开发人员本地机器运行的服务。

FAT环境用于将服务发布给其它开发人员调用,FAT环境与DEV环境共用一套物理设备和基础服务,如注册中心、网关等。区别在于,以FAT环境参数运行的服务,会注册到注册中心。

UAT环境用于用户测试,独立环境。

PRO环境用于生产,每个服务至少要部署两个实例。

8.1. 环境信息

开发环境 测试环境 生产环境

机器列表

app-fat-1

app-uat-1

  • app-pro-1

  • app-pro-2

注册中心

http://config.dev.zkh360.com

http://config.uat.zkh360.com

http://config.pro.zkh360.com

服务网关

http://gateway.fat.zkh360.com

http://gateway.uat.zkh360.com

http://gateway.zkh360.com

消息队列控制台

http://mqadmin.dev.zkh360.com

http://mqadmin.uat.zkh360.com

http://mqadmin.pro.zkh360.com

数据库(mysql)

  • 外网地址:db.dev.zkh360.com:50003 apollo/apollo.1234

  • 内网地址:10.10.0.202:3306 apollo/apollo.1234

  • 外网地址:db.uat.zkh360.com:50004 apollo/apollo.1234

  • 内网地址:10.10.0.203:3306 apollo/apollo.1234

请联系运维

缓存服务(redis)

  • 外网地址:cache.dev.zkh360.com:50005 redis.1234

  • 内网地址:10.10.0.202:6379 redis.1234

  • 外网地址:cache.uat.zkh360.com:50006 redis.1234

  • 内网地址:10.10.0.203:6379 redis.1234

请联系运维

消息队列(rabbitmq)

  • 外网地址:amqp://mq.dev.zkh360.com:50017 zkhdev/zkhdev

  • 内网地址:10.10.0.204:5672 zkhdev/zkhdev

  • 外网地址:amqp://mq.uat.zkh360.com:50050 zkhuat/zkhuat

  • 内网地址:10.10.0.205:5672 zkhuat/zkhuat

请联系运维

9. 认证和授权

9.1. 软件架构

目前公司的域名解析有点问题,如果遇到域名无法访问的话,可以多刷新几次试下。
OAuth2
Figure 23. OAuth2

我们将基于 OAuth2(开放认证)规范来构建安全中心。 OAuth2 是基于令牌的安全规范,允许用户使用第三方认证服务验证自己。如果用户成功认证,他们将获得一个令牌,该令牌必须与每个请求一起发送。然后可以将令牌返回给身份验证服务验证。

9.2. 接入安全中心

访问 http://security.fat.zkh360.com/manage.html 注意链接中的fat。不同的环境这个值不同。 使用公司的域账户登录即可

登录后左面菜单点应用管理,填入应用信息后保存。

应用信息
Figure 24. 应用信息

9.3. Spring Boot应用

本章描述SpringBoot应用的集成方式,非SpringBoot应用请参照后续章节。

请确保你的代码在com.zkh360包或其子包下。并确保你的启动类在com.zkh360包里。

在pom中增加以下依赖:

pom.xml
<dependency>
	<groupId>com.zkh360.security</groupId>
	<artifactId>zaf-security-client</artifactId>
	<version>1.1.4-RELEASE</version>
</dependency>

此依赖已包含在zaf-core-1.3.0-RELEASE的依赖中,如果你引用了zaf-core-1.3.0-RELEASE,可以不用单独引用此依赖。

在配置中心为你的应用增加以下配置:

application.properties
security.oauth2.client.clientId = 001001 (1)
security.oauth2.client.clientSecret = 123456 (2)
1 创建应用时填入的clientId
2 创建应用时填入的clientSecret

9.3.1. 通过单点登录认证身份

在配置中心覆盖info.app.atlas.framework的配置项zaf.security.mode,将值改成sso

zaf.security.mode=sso

在这个模式下,如果在未经过身份认证的情况下,访问需要身份认证后才可访问的资源时,会跳到登录服务器登录,登录成功后会再跳转回应用。

安全中心统一登录
Figure 25. 安全中心统一登录

登录用的用户名密码目前需要在安全中心维护。

如果创建应用时选的可以登录的用户类型是震坤行员工,那么使用公司的域账户即可登录。

用户只要在接入安全中心的某一个应用登录过,即可以相同的身份访问不同应用的资源。

9.3.2. 获取访问令牌

如果你的应用只是个微服务提供者,没有用户页面,那么需要访问者获取访问令牌才可以访问,访问令牌的获取方式如下:

应用向认证服务器发送以下请求:

获取token
Figure 26. 获取token
  1. 请求地址为 https://auth.fat.zkh360.com/server/oauth/token

  2. 请求method为 POST

  3. 请求头中的 Authorization 是用 clientId和clientSecret Base64加密后的串,具体格式为 'Basic ${base64( ${clientId} + ':' + ${clientSecret} )}'。 具体可以百度 Http Basic 认证。

  4. 参数grant_type,固定传 password。

  5. 参数scope 固定传 all。

  6. 参数username,安全中心中某个账户的用户名。

  7. 参数password, 安全中心中某个账户的密码。

返回的响应如下:

token
Figure 27. response
  1. access_token 访问令牌。

  2. token_type 令牌类型

  3. refresh_token 刷新令牌。

  4. expires_in 访问令牌过期时间。创建应用时设置。

  5. scope 授权范围

  6. jti JWT唯一标识.

9.3.3. 获取用户信息

不管你的应用是通过单点登录认证身份(有页面),还是只提供服务,需要别人携带令牌访问(无页面),在你的应用中都可以使用以下方式获取发送请求的用户名

Test.java
@GetMapping("/greeting")
public SimpleResponse<String> greeting(@AuthenticationPrincipal String user) throws Exception { (1)
	return new SimpleResponse<>("hello "+user);
}
1 用AuthenticationPrincipal注解获取登录用户的用户名。

使用访问令牌,也可以通过向微服务网关发送以下请求获取用户信息:

token
Figure 28. me
  1. 请求地址为 https://gateway.fat.zkh360.com/service-security/accounts/me

  2. 请求头中的 Authorization 为 'bearer ${access_token}'

返回的响应如下:

token
Figure 29. response

principal 为用户名。

9.3.4. 传递令牌

如果你的应用是通过单点登录认证身份的,然后你需要在应用中访问微服务,这时你需要获取当前登录用户对应的令牌,并在调用微服务时携带这个令牌。

为了解决这个问题,我们在zaf-security-client里注册了一个OAuth2RestTemplate实例,你可以@Autowired这个实例,然后用其发起微服务调用,这个实例,会在发送请求前,获取当前登录用户对应的令牌并加到请求头中。

9.3.5. 控制访问权限

在引入zaf-security-client以后,除了后缀为 .js, .css, .jpg, .png, .gif, .woff2, .woff, .ttf 的GET请求外,其它所有的请求都需要经过身份认证才可以访问。

身份认证有两种方式,单点登录和获取令牌,前面已经介绍过,分别对应有页面的应用和没页面的应用。

应用系统可以通过向Spring容器中增加 AuthorizeConfigProvider 接口的实现来配置不需要身份认证即可访问URL.

demo.java
@Component (1)
public class DemoConfigProvider implements AuthorizeConfigProvider { (2)

	/* (non-Javadoc)
	 * @see com.imooc.security.core.authorize.AuthorizeConfigProvider#config(org.springframework.security.config.annotation.web.configurers.ExpressionUrlAuthorizationConfigurer.ExpressionInterceptUrlRegistry)
	 */
	@Override
	public boolean config(ExpressionUrlAuthorizationConfigurer<HttpSecurity>.ExpressionInterceptUrlRegistry config) {
        config.antMatchers(HttpMethod.GET, "/api/**").permitAll() (3)
              .mvcMatchers(HttpMethod.POST, "/user/{id}").permitAll() (4)
              .regexMatchers(HttpMethod.GET, "/^[0-9]*$").permitAll(); (5)
        return false; (6)
    }

}
1 声明为Spring容器Bean
2 继承AuthorizeConfigProviderAdapter接口
3 指定不需要身份认证的URL,使用 ant pattern 语法
4 指定不需要身份认证的URL,使用 spring mvc 语法
5 指定不需要身份认证的URL,使用正则表达式语法。
6 固定返回false
3种指定URL的方法,都有一个不需要指定HttpMethod的重载。

针对需要身份认证才能访问的这些URL,如果需要更细粒度的权限控制,比如:经理可以管理用户信息,普通员工只能查看用户信息。需要在配置中心中覆盖info.app.atlas.framework的以下配置项开启权限控制。

zaf.security.authorize.enable=true

这时又有两种情况,一种是系统自己有权限控制体系。一种是使用安全中心提供的标准的基于角色的权限控制。

先说第一种。

如果应用自己有操作权限控制体系,可以向Spring容器中注入一个RbacService接口的实现,把自己的操作权限控制逻辑接入zaf-security-client。

DemoRbacService.java
@Component("rbacService") (1)
@Primary (2)
public class DemoRbacService implements RbacService { (3)

	/* (non-Javadoc)
	 * @see com.zkh360.zaf.security.client.authorize.RbacService#hasPermission(javax.servlet.http.HttpServletRequest, org.springframework.security.core.Authentication)
	 */
	@Override
	public boolean hasPermission(HttpServletRequest request, Authentication authentication) { (4)
		boolean result = check(request, authentication) (5)
		return result; (6)
	}

}
1 声明一个名为rbacService的Bean
2 由于zaf-security-client自带一个默认的RbacService接口实现,所以这里需要用@Primary注解让Spring优先使用我们自定义的RbacService
3 实现RbacService接口
4 实现hasPermission方法,request参数是要判断权限的请求,authentication参数是当前用户的认证信息。应用可以通过这两个参数判断当前用户是否有权访问此请求。
5 自定义判断的逻辑
6 返回判断结果,如果为false,zaf-security-client将拒绝当前请求,并返回403状态码。

如果应用没有操作权限控制,可以在安全中心里创建资源和角色,为用户分配角色并管理其权限。

权限管理的规则如下:

  1. 一种用户类型对应一组角色

  2. 同一个角色在不同应用中的资源权限不同

  3. 一个资源可以配置多个有权访问的URL,配置有权访问的URL时须使用ant pattern语法

  4. 一个用户可以有多个角色

  5. 用户最终权限是所有角色权限的并集

一般权限维护的步骤如下:

维护应用的登录用户类型
Figure 30. 维护应用的登录用户类型
创建对应类型的角色
Figure 31. 创建对应类型的角色

如果角色已存在,使用现有角色即可

维护账号和角色的对应关系
Figure 32. 维护账号和角色的对应关系
在应用中创建资源
Figure 33. 在应用中创建资源

资源的权限URL可以填多个,可以按ant模式来填,比如 /apps/**. 当用户所在的角色拥有这个资源时,那这个用户就可以访问所有/apps开头的url

指定角色在应用中的资源权限
Figure 34. 指定角色在应用中的资源权限

经过以上配置,zaf-security-client在收到请求时,就会根据当前用户信息,判断用户是否有权在某个应用中访问某个URL.

应用也可以向服务网关发送请求,获取当前用户的权限信息:

获取用户权限
Figure 35. 获取用户权限

返回的响应如下:

获取用户权限
Figure 36. 获取用户权限

返回的是一个树形结构的数据,因为用户的token是针对某一个应用发放的,所以,只会返回用户在这个应用中的资源权限。

9.3.6. 退出登录

退出登录时需要在认证服务器和应用本地同时退出。

访问应用的 /logout/url ,会获得一个退出地址,访问这个地址,即可在认证服务器和应用本地同时退出。

9.4. 非Spring Boot应用

本章描述非SpringBoot应用的集成方式,SpringBoot应用请参照前一章节。

非Spring Boot应用不需要集成jar包。

9.4.1. 通过单点登录认证身份

需要登录时,向认证服务器发起以下授权请求:

请求地址:

https://auth.fat.zkh360.com/server/oauth/authorize

请求参数:

参数名称 是否必填 参数描述

client_id

接入应用中心时填的client_id

redirect_uri

授权码回调地址

response_type

固定填code

state

状态码,任意字符串,回调时会原样传回。

认证服务器接收到此请求后,会判断用户是否已经在认证服务器登录,如果未登录,则跳到登录页面让用户登录。

安全中心统一登录
Figure 37. 安全中心统一登录

登录用的用户名密码目前需要在安全中心维护。

如果创建应用时选的可以登录的用户类型是震坤行员工,那么使用公司的域账户即可登录。

用户只要在接入安全中心的某一个应用登录过,即可以相同的身份访问不同应用的资源。

当用户登录成功(或已经登录)后,认证服务器会回调上面请求中redirect_uri参数指定的地址,并携带以下参数

参数名称 是否必填 参数描述

code

用户授权码

state

状态码,如果授权请求中此参数有值,则原样返回。

应用需要在回调地址中接收以上参数,并重新发起另一个请求来获取访问令牌。

获取令牌
Figure 38. 获取令牌
  1. 请求地址: https://auth.fat.zkh360.com/server/oauth/token

  2. 请求方法: POST

  3. 请求头中的 Authorization 是用 clientId和clientSecret Base64加密后的串,具体格式为 'Basic ${base64( ${clientId} + ':' + ${clientSecret} )}'。 具体可以百度 Http Basic 认证。

  4. 参数grant_type,固定传 authorization_code

  5. 参数code 认证服务器回调时携带的参数,在这个请求中再原样传上来。

  6. 参数redirect_uri,需要与授权请求中的redirect_uri参数一致。

返回的响应如下:

token
Figure 39. response
  1. access_token 访问令牌。

  2. token_type 令牌类型

  3. refresh_token 刷新令牌。

  4. expires_in 访问令牌过期时间。创建应用时设置。

  5. scope 授权范围

  6. jti JWT唯一标识.

向微服务网关发送以下请求获取用户信息:

token
Figure 40. me
  1. 请求地址为 https://gateway.fat.zkh360.com/service-security/accounts/me

  2. 请求头中的 Authorization 为 'bearer ${access_token}'

返回的响应如下:

token
Figure 41. response

principal 为用户名。

应用使用此用户名在本地登录即可。

9.4.2. 获取访问令牌

  1. 完成上面的流程即可获得访问令牌。

  2. 使用SpringBoot章节介绍的password模式也可以获取令牌,不过这种情况应该不会发生,因为password模式只用于微服务应用,而微服务应用都应该是SpringBoot写的。

9.4.3. 获取用户信息

  1. 完成上面的流程即可获得用户信息。

9.4.4. 权限控制

如果应用有自己的权限控制体系,登录成功以后用自己的权限体系即可。

如果使用安全中心提供的统一权限控制,在安全中心配置完权限后(具体配置参见SpringBoot应用章节),由于非SpringBoot应用本地没有引入客户端,所以需要自己发送请求获取权限信息(获取方式参见SpringBoot应用章节),然后自己根据权限信息来判断用户是否有权访问某个URL

9.4.5. 退出登录

退出登录时需要在认证服务器和应用本地同时退出。

退出地址的样式如下:

https://auth.fat.zkh360.com/server/logout?redirectUri=${应用本地的退出服务地址}

访问上面的url后,首先会在认证服务器退出,然后跳转到应用本地的退出服务地址,在应用本地退出。

9.5. 加密解密

9.5.1. 数据库密码加解密

数据库密码加解密基于阿里巴巴的druid数据源来实现。实现过程分为两步,一步由数据库管理员操作,另一步由开发人员实现,现说明如下:

数据库管理员的工作:

在命令行中执行如下命令:

java -cp druid-1.1.12.jar com.alibaba.druid.filter.config.ConfigTools you_password

you_password为数据库密码明文。

命令行的返回如下:

output.txt
 privateKey:MIIBVgIBADANBgkqhkiG9w0BAQEFAASCAUAwggE8AgEAAkEA6+ (1)
 publicKey:MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJB (2)
 password:PNak4Yui0+2Ft6JSoKB (3)
1 privateKey:私钥,数据库管理员自己保存
2 publicKey: 公钥,发给开发
3 password: 数据库密码密文,发给开发
开发人员的工作:

在pom.xml中加入以下依赖:

pom.xml
<dependency>
     <groupId>com.alibaba</groupId>
     <artifactId>druid</artifactId>
     <version>1.1.12</version>
</dependency>

使用druid作为数据源并配置如下:

config.xml
<bean id="dataSource" class="com.alibaba.druid.pool.DruidDataSource"
     init-method="init" destroy-method="close">
     <property name="url" value="jdbc:derby:memory:spring-test;create=true" />
     <property name="username" value="sa" />
     <property name="password" value="${password}" /> (1)
     <property name="filters" value="config" /> (2)
     <property name="connectionProperties" value="config.decrypt=true;config.decrypt.key=${publickey}" /> (3)
</bean>
1 ${password}处输入管理员发送的数据库密码密文
2 在过滤器链中加入 config 过滤器
3 ${publickey}处输入管理员发送的公钥

9.5.2. 敏感信息加解密

使用加密解密工具需要首先向安全中心注册你的应用,具体方法见前面。因为每个应用的秘钥都不同。

在pom中加入以下依赖。

pom.xml
<dependency>
	<groupId>com.zkh360.security</groupId>
	<artifactId>zaf-security-client</artifactId>
	<version>1.1.0-RELEASE</version>
</dependency>

加解密代码:

Test.java
KmsUtils utils = new KmsUtils(clientId, clientSecret); (1)
String ciphertext = utils.encrypt("13011056600"); (2)
String plaintext = utils.decrypt(ciphertext); (3)
1 使用在安全中心注册应用时填写的clientId和clientSecret初始化工具
2 加密
3 解密
这个工具类是线程安全的,可以声明为Spring的Bean,然后在需要的地方注入使用即可。

10. 保护你的微服务

10.1. 限流

10.2. 隔离

在分布式系统中,通常会有很多依赖,如下图:

dep
Figure 42. dep

在高并发访问下,这些依赖的稳定性与否对系统的影响非常大,但是依赖有很多不可控问题:如网络连接缓慢,资源繁忙,暂时不可用,服务脱机等.

如下图:QPS为50的依赖 I 出现不可用,但是其他依赖仍然可用.

dep
Figure 43. dep

当依赖I 阻塞时,上层的应用服务器(tomcat)的线程池就会以每秒50的速度阻塞(BLOCK),在几秒内,线程池就会完全占满, 影响整个线上服务的稳定性.如下图:

dep
Figure 44. dep

在复杂的分布式架构的应用程序中有很多的依赖,都会不可避免地在某些时候失败。高并发的依赖失败时如果没有隔离措施,当前应用服务就有被拖垮的风险。

例如:一个依赖30个SOA服务的系统,每个服务99.99%可用。 99.99%的30次方 ≈ 99.7%,0.3% 意味着一千万次请求,会有3万次失败,换算成时间大约每月有2个小时服务不稳定.这是不可接受的。

随着服务依赖数量的变多,服务不稳定的概率会成指数性提高。

解决问题的方案:使用Netflix提供的Hystrix组件对依赖做隔离。

Hystrix依赖的隔离架构,如下图:

dep
Figure 45. dep

Hystrix提供了两种隔离方式

线程隔离

Hystrix在用户请求和服务之间加入了线程池。用户的请求将不再直接访问服务,而是通过线程池中的空闲线程来访问服务,如果线程池已满调用将被立即拒绝,默认不采用排队.加速失败判定时间。线程数是可以被设定的。

信号隔离

信号隔离也可以用于限制并发访问,防止阻塞扩散, 与线程隔离最大不同在于执行依赖代码的线程依然是请求线程(该线程需要通过信号申请), 如果客户端是可信的且可以快速返回,可以使用信号隔离替换线程隔离,降低开销。信号量的大小可以动态调整, 线程池大小不可以。

10.3. 熔断

如果某个目标服务调用慢或者有大量超时,此时,熔断该服务的调用,对于后续调用请求,不在继续调用目标服务,直接返回,快速释放资源。如果目标服务情况好转则恢复调用。

熔断器是位于线程池(隔离)之前的组件。用户请求某一服务之后,Hystrix会先经过熔断器,此时如果熔断器的状态是打开(跳起),则说明已经熔断,这时将直接进行降级处理,不会继续将请求发到线程池。熔断器相当于在线程池之前的一层屏障。每个熔断器默认维护10个bucket ,每秒创建一个bucket ,每个blucket记录成功,失败,超时,拒绝的次数。当有新的bucket被创建时,最旧的bucket会被抛弃。

熔断器的原理是一个3个状态的状态机,如下图所示:

breaker
Figure 46. breaker
关闭状态

调用失败次数积累,到了阈值(或一定比例)则启动熔断机制,转为打开状态;

打开状态

此时对下游的调用都内部直接返回错误,不走网络,但设计了一个时钟选项,默认的时钟达到了一定时间(这个时间一般设置成平均故障处理时间,也就是MTTR),到了这个时间,进入半熔断状态;

半熔断状态

允许定量的服务请求,如果调用都成功(或一定比例)则认为恢复了,关闭熔断器,否则认为还没好,又回到熔断器打开状态;

10.4. 降级

所谓降级,就是指在Hystrix执行功能调用失败的情况下,我们如何处理,比如我们返回默认值等。

常见的降级处理如下:

快速失败

不做处理,直接抛异常

无声失败

返回null,空Map,空List

静态返回

返回默认值或自己组装一个值。

远程缓存

在失败的情况下再发起一次remote请求,不过这次请求的是一个缓存比如redis。由于是又发起一起远程调用,所以会重新封装一次Command,这个时候要注意,执行fallback的线程一定要跟主线程区分开,也就是重新命名一个ThreadPoolKey。

主次方式回退

我们日常开发中需要上线一个新功能,为了防止新功能上线失败,可以在新功能失败时回退到老的代码,

综上所述,Hystrix提供以下功能

  1. Hystrix使用命令模式HystrixCommand(Command)包装依赖调用逻辑,每个命令在单独线程中/信号授权下执行。

  2. 可配置依赖调用超时时间,超时时间一般设为比99.5%平均时间略高即可.当调用超时时,直接返回或执行fallback逻辑。

  3. 为每个依赖提供一个小的线程池(或信号),如果线程池已满调用将被立即拒绝,默认不采用排队.加速失败判定时间。

  4. 依赖调用结果分:成功,失败(抛出异常),超时,线程拒绝,短路。 请求失败(异常,拒绝,超时,短路)时执行fallback(降级)逻辑。

  5. 提供熔断器组件,可以自动运行或手动调用,停止当前依赖一段时间(10秒),熔断器默认错误率阈值为50%,超过将自动运行。

  6. 提供近实时依赖的统计和监控

10.5. Hystrix使用及配置

此文档只介绍在SpringBoot环境下的使用和配置,非SpringBoot环境使用理论上是一样的,但是需要自己额外做一些配置,请自行百度。

在项目中添加以下依赖:

pom.xml
<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-hystrix</artifactId>
</dependency>

此依赖已包含在zaf-core-1.3.0-RELEASE的依赖中,如果你引用了zaf-core-1.3.0-RELEASE,可以不用单独引用此依赖。

在启动类上添加@EnableCircuitBreaker注解

Demo.java
@EnableCircuitBreaker
@SpringBootApplication
public class DemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }
}

在调用远程服务的方法上添加@HystrixCommand注解

DemoService.java
@Service
public class HelloService {

    @Autowired
    RestTemplate restTemplate;

    @HystrixCommand(fallbackMethod = "helloFallback", (1)
    	commandProperties = {
	      	@HystrixProperty(name="execution.isolation.strategy", value="THREAD") (2)
	    })
    public String helloService(){
        return restTemplate.getForEntity("http://hello-service/index",
                String.class).getBody();
    }

    public String helloFallback(){
        return "error";
    }
}
1 指定降级时要调用的方法
2 指定隔离方式为信号量隔离

在上面的代码中,对helloService方法的调用会被隔离执行,隔离的策略是线程隔离,具体隔离时使用的线程池的参数,熔断器的行为参数等等都使用默认值,当服务请求失败或者熔断器打开时,调用helloFallback执行降级逻辑。

每个被@HystrixCommand注解的方法称为一个命令,@HystrixCommand属性详细说明如下:

HystrixCommand.java
public @interface HystrixCommand {

    // 命令所属的组的名称:默认为注解方法所属类的名称
    // Hystrix使用命令分组将一起的命令进行管理,比如报告、警报、仪表盘或组/库。
    String groupKey() default "";

    // HystrixCommand 命令的key值,默认值为注解方法的名称。
    String commandKey() default "";

    // 线程池名称,默认定义为groupKey
    String threadPoolKey() default "";

    // 定义回退方法的名称, 此方法必须和hystrix的执行方法在相同类中,并且和命令方法的声明格式相同(同样的参数和返回值)
    String fallbackMethod() default "";

    // 配置hystrix命令的参数,下面会细说
    HystrixProperty[] commandProperties() default {};

    // 配置hystrix依赖的线程池的参数 下面会细说
     HystrixProperty[] threadPoolProperties() default {};

    // 如果hystrix方法抛出的异常包括RUNTIME_EXCEPTION,则会被封装HystrixRuntimeException异常。我们也可以通过此方法定义哪些需要忽略的异常
    Class<? extends Throwable>[] ignoreExceptions() default {};

    // 定义执行hystrix observable的命令的模式,类型详细见ObservableExecutionMode
    ObservableExecutionMode observableExecutionMode() default ObservableExecutionMode.EAGER;

    // 如果hystrix方法抛出的异常包括RUNTIME_EXCEPTION,则会被封装HystrixRuntimeException异常。此方法定义需要抛出的异常
    HystrixException[] raiseHystrixExceptions() default {};

    // 定义默认回调方法:但是defaultFallback不能传入参数,返回参数和hystrix的命令兼容
    String defaultFallback() default "";
}

hystrix的参数名分成两类:命令参数和线程池参数,对应的配置项的开头分别为 hystrix.command 和 hystrix.threadpool。对应@HystrixCommand注解的commandProperties参数和threadPoolProperties参数。

hystrix的参数值分成3级:全局默认值,全局动态值和实例值。 优先级为:实例值>全局动态值>全局默认值

拿隔离方式这个配置项举个例子。

  1. 隔离方式是一个命令参数。以为着它的配置项以 hystrix.command 开头

  2. 隔离方式的参数名是execution.isolation.strategy

  3. 隔离方式的默认值是THREAD

首先,如果不做任何配置,写下如下代码:

HelloService.java
@Service
public class HelloService {

    @Autowired
    RestTemplate restTemplate;

    @HystrixCommand
    public String helloService(){
        return restTemplate.getForEntity("http://hello-service/index",
                String.class).getBody();
    }

}

那么,helloService方法的调用,会采用线程方式隔离,这是参数的全局默认值决定的。

如果我在配置文件中加入以下配置项:

hystrix.command.default.execution.isolation.strategy = SEMAPHORE

那么,包括helloService方法在内,系统内所有以@HystrixCommand注解的方法,都会采用信号方式隔离。这是由于全局动态值覆盖了全局默认值。

如果我继续在注解上加入以下配置:

DemoService.java
@Service
public class HelloService {

    @Autowired
    RestTemplate restTemplate;

    @HystrixCommand(
    	commandProperties = {
	      	@HystrixProperty(name="execution.isolation.strategy", value="THREAD")
	    })
    public String helloService(){
        return restTemplate.getForEntity("http://hello-service/index",
                String.class).getBody();
    }
}

那么,只有helloService方法的调用,会采用线程方式隔离,其它@HystrixCommand注解的方法,仍然采用信号方式隔离,这是由于实例值覆盖了全局动态值。

OK,下面介绍所有的命令参数和线程池参数。

命令参数在配置文件配置时需要加 hystrix.command.default. 前缀,在@HystrixCommand注解的 commandProperties 属性配置时不需要。
线程池参数在配置文件配置时需要加 hystrix.threadpool.default. 前缀,在@HystrixCommand注解的 threadPoolProperties 属性配置时不需要。
Table 1. 命令参数
参数名称 默认值 参数描述

execution.isolation.strategy

THREAD

隔离策略,有THREAD和SEMAPHORE

默认使用THREAD模式,以下几种场景可以使用SEMAPHORE模式:

只想控制并发度

外部的方法已经做了线程隔离

调用的是本地方法或者可靠度非常高、耗时特别小的方法

hystrix.command.defalut.execution.isolation.thread.timeoutInMilliseconds

1000

超时时间,一般设成TP99的时间

execution.timeout.enabled

true

超时时间的设置是否生效

execution.isolation.thread.interruptOnTimeout

true

在发生超时时是否应中断命令执行,THREAD模式有效

execution.isolation.thread.interruptOnCancel

false

当发生取消时,执行是否应该中断,THREAD模式有效

execution.isolation.semaphore.maxConcurrentRequests

10

该参数当使用SEMAPHORE策略时才有效。如果超出该并发量,则超出的会被拒绝;且该值必需小于容器的线程池大小,否则并不起保护作用。

fallback.enabled

true

在发生失败或拒绝时是否尝试调用降级方法。

fallback.isolation.semaphore.maxConcurrentRequests

10

降级方法最大的并发调用数,如果超出该数,则后续降级方法调用会被拒绝

circuitBreaker.enabled

true

断路器是否起作用

circuitBreaker.requestVolumeThreshold

20

一个rolling window(滚动窗口,下面会讲)内最小的请求数。如果设为20,那么当一个rolling window的时间内(比如说1个rolling window是10秒)收到19个请求,即使19个请求都失败,也不会触发熔断。简单说就是设一个流量阈值,当流量大于这个阈值时,熔断器才起作用

circuitBreaker.sleepWindowInMilliseconds

5000

触发短路的时间值,当该值设为5000时,则当触发熔断后的5000毫秒内都会拒绝request,也就是5000毫秒后才会关闭circuit。

circuitBreaker.errorThresholdPercentage

50

错误比率阀值,如果错误率>=该值,熔断器会被打开,并短路所有请求触发fallback

circuitBreaker.forceOpen

false

是否强制打开断路器,如果打开则会拒绝所有的请求,优先级比circuitBreaker.forceClosed高

circuitBreaker.forceClosed

false

是否强制关闭断路器,是则允许所有的请求,无视错误率

metrics.rollingStats.timeInMilliseconds

10000

设置统计的时间窗口(rolling window)值的,毫秒值,熔断器是否打开会根据1个rolling window的统计来计算。若rolling window被设为10000毫秒,则rolling window会被分成n个buckets,每个bucket包含success,failure,timeout,rejection的次数的统计信息

metrics.rollingStats.numBuckets

10

是设置一个rolling window被划分的数量,若numBuckets=10,rolling window=10000,那么一个bucket的时间即1秒。必须符合rolling window % numberBuckets == 0。

metrics.rollingPercentile.enabled

true

是否开启百分数和均值统计,这些数据将用于在dashboard中监控所有被保护方法的健康状况。

metrics.rollingPercentile.timeInMilliseconds

60000

此属性设置滚动百分比窗口的持续时间,其中保留执行时间以允许百分比计算 (以毫秒为单位),此属性只影响初始度量值的创建, 并且在启动后对此属性进行的调整将不会生效

metrics.rollingPercentile.numBuckets

6

属性设置滚动百分比窗口划分的桶数,例如,滚动百分比窗口持续时间为60秒,默认配置6个桶,那么一个桶用于存放10秒的统计数据。配置值必须符合以下条件 metrics.rollingPercentile.timeInMilliseconds % metrics.rollingPercentile.numBuckets == 0,否则会抛出异常。此属性只影响初始度量值的创建,并且在启动后对此属性进行的调整将不会生效

metrics.rollingPercentile.bucketSize

100

此属性设置每个桶保留的最大执行时间数,如果超出桶的最大执行数量,则会记录到下一个桶.此属性只影响初始度量值的创建,并且在启动后对此属性进行的调整将不会生效

metrics.healthSnapshot.intervalInMilliseconds

500

此属性设置在允许运行健康快照以计算成功和错误百分比并影响断路器状态的等待间隔的时间 (以毫秒为单位)。误差百分比的连续计算是 CPU 密集型的,因此此属性允许您控制计算的频率

Table 2. 线程池参数
参数名称 默认值 参数描述

coreSize

10

此属性配置线程池大小,大部分情况下,10是一个合适的值。而且经常应该设的更小。

maximumSize

10

此属性设置最大线程池大小。这是在拒绝命令执行前的最大并发量。请注意, 您必须同时设置 allowMaximumSizeToDivergeFromCoreSize 属性,这个属性才会生效。

allowMaximumSizeToDivergeFromCoreSize

false

maximumSize的配置是否生效,如果maximumSize 大于 coreSize 配置,则在 keepAliveTimeMinutes 时间后回收线程

keepAliveTimeMinutes

1

此属性设置线程空闲生存时间 (分钟)

maxQueueSize

-1

请求等待队列

queueSizeRejectionThreshold

5

此属性设置队列大小拒绝阈值 - 即使未达到maxQueueSize也将发生拒绝的人为最大队列大小。此属性存在,因为BlockingQueue的maxQueueSize不能动态更改,我们希望允许您动态更改影响拒绝的队列大小。注意:如果maxQueueSize == -1,则此属性不适用。

metrics.rollingStats.timeInMilliseconds

10000

此属性设置statistical rolling窗口的持续时间(以毫秒为单位)。 这是为线程池保留多长时间。

metrics.rollingStats.numBuckets

10

此属性设置滚动统计窗口划分的桶数。 注意:以下必须为true - “metrics.rollingStats.timeInMilliseconds%metrics.rollingStats.numBuckets == 0” -否则将引发异常。

10.6. Hystrix监控

当我们为服务调用加上Hystrix以后,我们会希望可以随时了解这些被保护的服务的状态,比如断路器的状态,服务的压力,线程池的情况等等。这些信息,可以通过Hystrix的监控面板获取。

服务部署之前,需要联系架构部门,将你的应用的spring.application.name加到监控面板中,之后才能看到你的服务。
服务部署以后,需要有流量访问以后才能看到流量信息,没访问过的服务,在监控面板中看不到。
只有加了@HystrixCommand注解的方法才会出现。

监控面板中各参数的意义如下图所示:

turbine
Figure 47. turbine

11. 容器化部署

微服务落地的过程中,"部署"将是你会面对的最大的挑战之一。

一个普通的业务系统迁移到微服务架构后,会变成几十个微服务应用,为了保证每个服务的高可用,每个应用都应该被集群化部署,这时,你需要维护的应用实例将以百计。而一个复杂的大型系统,这个数字会上千。

如何高效的部署、监控、运维、迭代这成百上千的服务实例,并保证其高可用,就成了一个棘手的问题。

此外,每个应用都有自己的特点,有的白天忙,有的晚上忙,有的需要大量内存,有的会频繁的读写磁盘。但是按我们目前的资源使用方式,一旦某个应用申请了几台机器,那么这几台机器的资源就都归这个应用了。不管这个应用忙还是闲,都会占着这些资源。如何最大化的利用资源,让不忙的系统把CPU内存等资源让给忙的系统用,是另一个需要解决的问题.

解决这些问题的一个方案,就是容器化和容器编排。

container
Figure 48. container

容器编排过程中涉及的核心概念如上图所示:

  1. 微服务应用:这是可执行的程序包,最常见的形式就是SpringBoot的可执行jar,也可能是war包,或者php的文件等等。

  2. 镜像容器: 镜像的作用是将不同语言,不同格式的服务应用统一成一个格式。容器是镜像的实例,类似面向对象中 Class 和 instance 的关系。镜像里包含了服务应用本身,以及运行应用所需要的环境,比如SpringBoot的可执行jar需要JDK环境才能运行。war包需要tomcat才能运行。这些东西(jdk,tomcat等)都会被作为镜像的一部分放入镜像中。而用来描述镜像中包含哪些东西的文件,就是Dockerfile。镜像可以多次运行,每运行一次就会产生一个容器。相当于你的应用被部署了一次。镜像是只读的,容器是可写的。

  3. Pod:Pod是容器云平台的最小调度单位。一个Pod可以包含多个容器。你应该保证Pod的功能是单一的。你可以通过deployment.yml来描述如何将容器组装成Pod,并且要求云平台必须保证可用Pod的数量。

  4. Service: Service是一个逻辑概念,一组相同功能的Pod组成一个Service。因为Pod会随着集群中节点资源的变化调度,而Service则提供了一个不可变的对外端口。你可以通过service.yml来描述如何将Pod组装成service以及如何对外提供服务。

  5. Application: 多个服务最终组成你的应用。

通过上面的描述,你应该了解到,如果要将你的应用部署到容器云平台,你需要最少3个配置文件:Dockerfile, deployment.yml 和 service.yml。

如果你的应用是SpringBoot写的,那么恭喜你,你中头彩了,这3个文件我们都已经帮你处理了,你什么都不需要写。

截止到10.1,我们处理了 war + tomcat 的部署方式,如果你的应用是war + tomcat 部署的,你也不需要额外写东西了。

如果你的应用不是Java写的,请直接联系我们,我们会帮你一起把你的应用放到容器云上。

11.1. Java应用

OK,下面我们首先来看一下如何把Java应用部署到容器云平台。

app
Figure 49. app

这是一个典型的Maven多模块应用,其中红框的文件夹是我们要部署的应用。针对这个文件夹,你需要确认以下信息:

  • 这个文件夹下有个pom.xml文件。

  • pom文件的artifactId与文件夹的名字相同

artifactId
Figure 50. artifactId
  • pom的build块的finalName和文件夹名字相同

finalName
Figure 51. finalName

OK,满足这些条件以后,就可以到jenkins ( http://101.37.148.182:50032/ ) 上创建一个构建项目了。

jenkins
Figure 52. jenkins

项目的名字,必须是文件夹的名字

下面的复制框里,填zaf-test,这是一个测试好的构建,直接复制即可。

在项目信息页面,修改项目描述

jenkins
Figure 53. jenkins

拉到最下面,在流水线脚本里修改REPOSITORY变量的值,改成你项目的git仓库的地址。

jenkins
Figure 54. jenkins

OK,完成了,现在你可以开始构建了。

jenkins
Figure 55. jenkins

构建参数说明如下

packege

部署方式

jar:Spring Boot 可执行jar的部署方式选这个

war:war + tomcat的部署方式选这个

更改选项的顺序可以改变默认值,如果你的应用是war包部署,请修改上面的选项,把war放在第一个,这样不用每次部署时都选一下。

branch

要拉取的代码分支

env

下拉选择要部署的环境,

domain

用于指定是否自动生成域名。如果你的应用有页面,需要别人通过域名来访问,请勾选这个。默认生成的域名是"项目名.环境(ENV小写).zkh360.com"比如项目名是a,上面那个ENV选的是FAT,那么生成的域名是a.fat.zkh360.com。你需要找运维,把这个域名指向101.37.179.171(生产环境域名指向 47.110.167.5)这个IP。这样,当你的项目部署好以后,就可以通过这个域名访问了。

如果勾选了domain框,请保证你的服务运行于8080端口,由于应用都是在独立的容器中运行,不用担心端口会冲突。
所有的微服务都不需要生成点domain框,因为微服务统一通过网关访问。
在生产环境下,自动生成的域名不会包含环境信息,按上面那个例子,应用a在部署到PRO环境时,自动生成的域名是 a.zkh360.com
domainurl

如果你的应用已经有在使用的域名,不想用脚本自动生成的域名,可以在这里指定你的应用对应的域名。记得要将域名映射指定到101.37.179.171(生产环境域名指向 47.110.167.5)

每个参数都可以设置默认值,请根据你的实际情况设置默认值。比如是否生成域名。
在容器云内部的应用,用http互相访问时,最好不要用外网的域名,因为会先从容器云出去公网,再回到容器云里,性能会比较差。每一个部署在容器云里的应用都会被分配一个容器云的内部域名,容器云里的应用可以用这个域名互相访问,比如你的应用叫 a,部署在fat环境下,那么,从容器云外部,可以用 a.fat.zkh360.com访问,在容器云内部,可以用 a.zkh-fat.svc.cluster.local:8080 来访问,这样访问时,请求不会出去公网,只在容器云局域网内,性能会比较好。 内部域名的规则是: 应用名.zkh-环境.svc.cluster.local,端口统一是8080,外部域名的规则是: 应用名.环境.zkh360.com 端口统一是 80

点击开始构建即可。构建会图形化的显示出来,也可以点击左侧的小进度条,查看控制台的输出

jenkins
Figure 56. jenkins
jenkins
Figure 57. jenkins

一切顺利的话,你的应用就会部署成功,然后通过相应环境的eureka控制台就可以看到服务已经上线了。

http://config.fat.zkh360.com/

如果你为你的应用生成或指定了域名,那么通过这个域名也可以访问应用了。

jenkins
Figure 58. jenkins

默认情况下,每个应用会部署两个实例,以保证高可用。

11.2. war + tomcat应用

如果你的应用是war+tomcat部署的,只要把package选成war即可。

11.3. 查看日志

如果服务构建成功了,但是服务启动时失败,或者想查一下服务的日志,可以登录阿里云的日志服务查看。

登录地址:

https://signin.aliyun.com/1602027139684780/login.htm?spm=5176.2020520153.10101.d1.44a243f7dXL1dM

用户名密码:

zkh360_log/Zkh360-com
jenkins
Figure 59. jenkins

在日志服务首页,选择k8s-log开头的项目。

10月22日以后,我们为生产环境单开了一个集群,部署到PRO环境的应用,请到2018-10-22的那个project里看日志。
jenkins
Figure 60. jenkins

在项目的logstore列表页,点击k8s-stdout的查询按钮

jenkins
Figure 61. jenkins
jenkins
Figure 62. jenkins

在查询页面里,左侧红框中的_namespace_会列出目前集群中的环境,如zkh-fat,zkh-uat,点击某个环境,会过滤出这个环境中的日志。

_pod_name_会列出目前被收集日志的所有应用,每个应用部署两个实例,点击你的应用的名字开头的实例,会过滤出这个应用实例的日志。

你所选择的_namespace_ 和 _pod_name_都会作为查询条件添加到页面上方的搜索框中,你可以在这些条件后面加入 AND 加你要搜索的关键字来查找指定的日志。

被筛选出来的日志,包含很多字段,其中content字段是真正的日志内容。

11.4. Https

为了配合公司信息安全建设要求,容器云平台会做一些相关调整,现说明如下:

1.微服务应用。

微服务应用是指部署在容器云上,没有对外的域名,外界统一通过微服务网关访问的应用,如sap-s, sim等。

  1. 微服务网关地址改为: gateway.fat.zkh360.com(开发),gateway.uat.zkh360.com(测试),gateway.zkh360.com(生产)

  2. 新的微服务网关需要通过https访问。

  3. 原有的api开头的http服务网关目前仍然可用。

  4. 原有的api开头的http网关将在2018年12月25日关闭。

2.非微服务应用

非微服务应用是指部署在容器云上,有自己的对外域名,外界通过各个应用自己的域名访问各个应用。如superrom-web, 安全中心等。

1)所有非微服务应用请按照如下规则来统一命名域名:

  1. 开发环境:*.fat.zkh360.com

  2. 测试环境:*.uat.zkh360.com

  3. 生产环境:*.zkh360.com

目前容器云平台对非微服务应用会自动按照以上规则生成域名,没有使用容器云自动生成的域名的应用,请按照以上规则调整域名,或使用容器云自动生成的域名。

2)2018年12月25日之后,所有通过容器云部署的非微服务应用,将自动加入https配置,强制使用https访问。 3) 2018年12月25日之前,所有通过容器云部署的非微服务应用,仍然使用http访问,如果你需要立即将你的应用升级成https访问,请联系我。

所有开发测试环境的https访问,使用自签证书,自签证书在浏览器中会有不安全的提示,但不影响服务的正常访问。 所有生产环境的https访问,使用机构颁发的证书,不会有安全提示,可以正常访问。

11.5. 滚动更新

默认情况下,容器云平台在更新应用时遵循以下时间线。

jenkins
Figure 63. jenkins

简单来说,由于云平台不知道容器内部的应用何时启动完成,所以,默认情况下云平台是以容器本身创建成功作为一个信号的。一旦容器创建成功,云平台就会销毁老的容器,而不管容器内部的应用是否启动完成了。如上图所示,这会导致在集群中出现一段真空期,就是老容器都销毁了,新容器里的应用还没起来。

为了避免这种情况,我们需要将容器云平台中更新应用的时间线变为如下图所示

jenkins
Figure 64. jenkins

在新的时间线中,云平台会等新容器中的应用启动成功以后,才销毁老容器,这样就可以保证集群中总是存在可用的服务实例。

为了达到这个目的,你的应用需要满足以下条件

在应用的8080端口提供一个http协议的/ready服务。

换句话说,你在你自己机器上正常启动你的应用后,应该可以访问 http://127.0.0.1:8080/ready,并且返回的http状态码是200.

我们将在2019年2月1日启动这个探针,也就是说,如果你的应用在2019年2月1日以后没有/ready这个服务,那么将无法正常部署到云平台,因为云平台会认为你的应用没有正常启动。

如果你用了zaf框架的客户端 zaf-core,只需要把客户端升级到 1.3.2-RELEASE即可。里面包含了/ready服务。

如果你急切的需要这个探针,那么尽快将/ready服务加上,然后联系我们,我们会先为你单独开通这个探针。
如果你的应用无法提供Http服务,也请联系我们为你单独定义探针。
下面的信息非常重要,仔细看。
滚动更新只能保证集群中不出现服务真空期,并不能保证请求每次都会被路由到可用的实例上。为了避免偶然的网络抖动导致正常的服务实例被摘除,服务注册中心会在某个实例失去心跳一段时间后才将此服务实例从实例列表中摘除。在这段时间内,网关仍然会将请求路由到失去心跳的实例上,从而导致服务调用失败。为了避免这个问题,需要服务的调用者在客户端进行重试。从上面的时间线上可以看到,在极端情况下,云平台中可能同时存在4个服务实例,所以客户端重试的次数不应该小于4,以确保在重试过程中,请求一定会落到可用的实例上。这个问题是Eureka服务注册中心的特性造成的,所以如果你的应用没有走微服务网关,而是自己有域名,那么理论上不会有这个问题。

下面是我测试时使用的一段代码,使用这段代码,在整个dome服务更新的过程中,没有出现过调用失败,供参考。

public static void main(String[] args) throws Exception {

	HttpComponentsClientHttpRequestFactory factory = new HttpComponentsClientHttpRequestFactory();
	factory.setReadTimeout(1000);
	factory.setConnectTimeout(1000);

	RestTemplate restTemplate = new RestTemplate(factory);
	int retry = 0;
	while (true) {

		try {

			String resp = restTemplate.getForObject("https://gateway.zkh360.com/service-demo/greeting?user=1", String.class);
			System.out.println(resp);
			Thread.sleep(2000);
			retry = 0;

		} catch (Exception e) {
			retry = retry + 1;
			if(retry == 4) {
				System.out.println("调用失败!!!!!!!!!!!!!");
				retry = 0;
				Thread.sleep(2000);
			}
		}
	}

}

11.6. 常见问题

Maven构建失败

默认情况下,流水线脚本是针对一个与当前项目同名的目录下的pom.xml执行mvn命令的,如下图所示:

jenkins
Figure 65. jenkins

如果你的pom.xml不在这个位置,你需要修改这里,指定你的pom.xml的位置。

项目命名问题

你的项目名称不能包含下划线,请将下划线改为中横线

Session共享问题

由于云平台默认会部署两个实例来保证系统高可用,所以请确保你的项目不会因为集群化部署而产生Session共享问题。最常见的解决方案是加入Spring Session。过程非常简单,百度一下即可。

目前容器云平台已经上线了一段时间,也有许多应用在用了,借这个机会再推广一下,将你的应用部署到容器云平台你可以获得以下好处:

1.简单方便:无需申请机器,无需安装维护环境,一键将git中的项目代码部署到容器云上。 2.高可用:集群部署,每个应用最少部署两个节点,自动进行负载均衡。 3.故障自愈:当有应用实例不可用时,容器云会自动重新部署该实例。 4.安全:所有发布在容器云上的应用将自动配置为https访问。

后期计划中的特性包括: 1.一键回滚:当发布或升级失败时,一键回滚到之前发布成功的版本。 2.滚动更新:应用更新的过程中服务不中断。 3.监控报警:监控服务健康状况,当服务出现异常时自动通知相关人员。 4.自动扩缩容:根据应用压力情况自动增加或减少应用实例。 5.灰度发布:新老版本并存,引一部分流量到新版本,验证可用后再撤掉老版本。

更多新特性规划中。。。

12. 调用链监控

13. 指标监控

此功能主要用于实时监控容器的基本硬件指标并报警。

监控面板地址为:http://grafana.zkh360.com

用户名:zkh360

密码:1234.com

jenkins
Figure 66. jenkins

登录后可以看到两个 Dashborad,一个是开发测试环境的,一个是生产环境的。

jenkins
Figure 67. jenkins

选择相应环境的Dashboard进入后,首先可以看到集群资源(网络,CPU,内存,硬盘)的整体使用情况。在页面右上角可以设定数据展示的时间范围和刷新频率。

jenkins
Figure 68. jenkins

往下拉一点,就可以看到集群中所有应用的CPU,内存和网络的实时使用情况。

jenkins
Figure 69. jenkins

在图表的右侧,有所有应用的列表,可以通过点击具体的应用节点,来观察特定应用节点的实时资源使用情况

jenkins
Figure 70. jenkins

点击某个图表,可以全屏观看,也可以将数据导出为csv格式。

jenkins
Figure 71. jenkins
由于目前开发和测试环境在一个物理集群上,所以在开发测试的Dashborad中,每个应用默认会有4个节点,2个开发,2个测试,你需要联系运维来帮你确认下哪2个是开发节点,哪两个是测试节点,这里后面运维部门会做一下拆分,把开发测试的面板分开。
如果大家对监控有什么额外的需求或问题,可以在钉钉微服务支持群里联系雷广岩或者我。群号 21759080

14. 日志监控

提供2种方式接入
1)日志文件接入
  提供系统日志的路径给到运维,运维会进行日志收集配置。
2)容器云日志接入
  应用如果是部署在容器云的,建议大家把日志打印在/zkh/log目录下,系统会自动从指定目录进行收集日期。
以上2种方式都不需要开发进行任何维护,运维和技术人员会进行配置。

14.1. 日志查询地址

UAT 测试环境地址  http://120.55.242.14:5601/app/kibana,相关域名正在发布http://kibana.uat.zkh360.com:5601
Release 正式环境地址 http://101.37.148.182:5601/app/kibana,相关域名正在发布http://kibana.pro.zkh360.com:5601

15. 健康监控

16. 分布式缓存

17. 分布式锁

18. 分布式任务调度

19. 分布式事务